JMS 是满足持久阻塞队列需求的答案吗?

发布于 2024-10-03 02:04:52 字数 491 浏览 8 评论 0原文

我正在创建一个由 Log4J 附加程序组成的库,该附加程序将事件异步发送到远程服务器。当发出日志语句时,附加程序将异步地将事件记录到本地队列中,然后消费者池将检索该事件并将其发送到远程。

完全内存中的解决方案是创建一个 BlockingQueue 来处理并发问题。但是,我希望队列能够被持久化,这样如果远程服务器不可用,我就不会无限制地增长队列,或者在有界队列的情况下开始丢弃消息。

我正在考虑使用嵌入式 H2 数据库在本地存储事件,然后使用轮询机制来检索事件并将其发送到远程。我宁愿使用 BlockingQueue 而不是轮询数据库表。

JMS 是答案吗?

编辑:

如果 JMS 是答案,而且似乎正在朝着这个方向发展,那么是否有人对轻量级、可嵌入 JMS 解决方案有建议,该解决方案可以配置为仅接受进程中的消息?换句话说,我不想,也可能不会被允许,打开一个 TCP 套接字来侦听。

编辑:

我现在已经嵌入了 ActiveMQ,它似乎正在工作。谢谢大家。

I'm creating a library that consists of a Log4J appender that asynchronously sends events to a remote server. When a log statement is made, the appender will asynchronously record the event into a local queue which a pool of consumers will then retrieve and send to the remote.

The completely in-memory solution would be to create a BlockingQueue which would handle the concurrency issue. However, I'd like for the queue to be persisted so that if the remote server is not available I don't grow the queue unbounded or start to discard messages in the case of a bounded queue.

I was thinking of using an embedded H2 database to store the events locally and then use a polling mechanism to retrieve events and send to the remote. I would much rather use a BlockingQueue than to poll a database table.

Is JMS the answer?

EDIT:

If JMS is the answer, and it seems to be going that way, does anyone have recommendations on a lightweight, embeddable JMS solution that can be configured to only accept messages in-process? In other words, I do not want to, and possibly will not be allowed to, open up a TCP socket on which to listen.

EDIT:

I've got ActiveMQ embedded now and it seems to be working. Thanks all.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

森林迷了鹿 2024-10-10 02:04:52

Bob Lee 不久前开源了一个非常简单的磁盘支持队列,https://github.com/square/retrofit/blob/master/modules/android/src/retrofit/io/QueueFile.java - 可能会有所帮助,而且肯定要容易得多如果你能接受本地持久性,就引入比JMS 更好的方法。

该类是独立的——可以复制和粘贴。

Bob Lee open sourced a very simple disk backed queue a while back, https://github.com/square/retrofit/blob/master/modules/android/src/retrofit/io/QueueFile.java -- may be helpful, and is certainly a lot easier to introduce than JMS if you can accept local durability.

This class is standalone -- it can be copied and pasted.

情感失落者 2024-10-10 02:04:52

您可以使用 JMS 将消息异步发送到远程计算机(假设它当然可以接收它们),Log4j 有一个 JMS Appender 您可以用于此目的。

You could use JMS to asynchronously send messages to a remote machine (assuming it can receive them of course), Log4j has a JMS Appender you can use for this.

小红帽 2024-10-10 02:04:52

您绝对可以使用 JMS 来实现此目的。据我了解,您正在使用 Log4J JMS 附加程序。该组件将消息发送到预先配置的 JMS 目标(通常是队列)。您可以将此队列配置为持久化。在这种情况下,插入队列的所有消息将自动存储在某个持久存储中(通常是数据库)。不幸的是,此配置是特定于供应商的(取决于 JMS 供应商),但通常非常简单。请参阅您的 JMS 提供商的文档。

You can definitely use JMS for this purpose. As far as I understand you are using the Log4J JMS appender. This component sends messages to pre-configured JMS destination (typically queue). You can configure this queue to be persisted. In this case all messages inserted into the queue will be automatically stored in some persisted store (typically database.). Unfortunately this configuration is vendor specific (depends on the JMS vendor), but usually is very simple. Please refer to the documentation of you JMS provider.

独木成林 2024-10-10 02:04:52

看看这是否有效

此代码应该适合您 - 它是内存中的持久阻塞队列 - 需要一些文件调整但应该可以工作

       package test;

     import java.io.BufferedReader;
     import java.io.BufferedWriter;
     import java.io.File;
     import java.io.FileReader;
     import java.io.FileWriter;
     import java.io.IOException;
     import java.util.ArrayList;
     import java.util.Collections;
     import java.util.LinkedList;
     import java.util.List;

     public class BlockingQueue {

    //private static Long maxInMenorySize = 1L;
    private static Long minFlushSize = 3L;

    private static String baseDirectory = "/test/code/cache/";
    private static String fileNameFormat = "Table-";

    private static String  currentWriteFile = "";

    private static List<Object>  currentQueue = new LinkedList<Object>();
    private static List<Object>  lastQueue = new LinkedList<Object>();

    static{
        try {
            load();
        } catch (IOException e) {
            System.out.println("Unable To Load");
            e.printStackTrace();
        }
    }

    private static void load() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()==0){
            //currentQueue = lastQueue = new ArrayList<Object>();
            currentWriteFile = baseDirectory + "Table-1";
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString()+ "\n");
                lastQueue.remove(0);
            }
            writer.close();
        }else{
            if(fileList.size()>0){
                    BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                    String line=null;
                    while ((line=reader.readLine())!=null){
                        currentQueue.add(line);
                    }
                    reader.close();
                    File toDelete = new File(fileList.get(0));
                    toDelete.delete();
            }

            if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1)));
                currentWriteFile = fileList.get(fileList.size()-1);
                String line=null;
                while ((line=reader.readLine())!=null){
                    lastQueue.add(line);
                }
                reader.close();
                //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9));
            }
        }

    }

    private void loadFirst() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                String line=null;
                while ((line=reader.readLine())!=null){
                    currentQueue.add(line);
                }
                reader.close();
                File toDelete = new File(fileList.get(0));
                toDelete.delete();
        }
    }

    public Object pop(){
        if(currentQueue.size()>0)
            return  currentQueue.remove(0);

        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if(currentQueue.size()>0)
            return  currentQueue.remove(0);
        else
            return null;
    }

    public synchronized Object waitTillPop() throws InterruptedException{
        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            if(currentQueue.size()==0)
                wait();
        }
        return currentQueue.remove(0);
    }

    public synchronized void push(Object data) throws IOException{
        lastQueue.add(data);
        this.notifyAll();
        if(lastQueue.size()>=minFlushSize){
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString() + "\n");
                lastQueue.remove(0);
            }
            writer.close();

            currentWriteFile  = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
                    (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1);
        }
    }

    public static void main(String[] args) {
        try {
            BlockingQueue bq = new BlockingQueue();

            for(int i =0 ; i<=8 ; i++){
                bq.push(""+i);
            }

            System.out.println(bq.pop());
            System.out.println(bq.pop());
            System.out.println(bq.pop());

            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());



        } catch (Exception e) {
            e.printStackTrace();
        }
    }

See if this works

This code should work for you - its an in memory persistent blocking queue - needs some file tuning but should work

       package test;

     import java.io.BufferedReader;
     import java.io.BufferedWriter;
     import java.io.File;
     import java.io.FileReader;
     import java.io.FileWriter;
     import java.io.IOException;
     import java.util.ArrayList;
     import java.util.Collections;
     import java.util.LinkedList;
     import java.util.List;

     public class BlockingQueue {

    //private static Long maxInMenorySize = 1L;
    private static Long minFlushSize = 3L;

    private static String baseDirectory = "/test/code/cache/";
    private static String fileNameFormat = "Table-";

    private static String  currentWriteFile = "";

    private static List<Object>  currentQueue = new LinkedList<Object>();
    private static List<Object>  lastQueue = new LinkedList<Object>();

    static{
        try {
            load();
        } catch (IOException e) {
            System.out.println("Unable To Load");
            e.printStackTrace();
        }
    }

    private static void load() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()==0){
            //currentQueue = lastQueue = new ArrayList<Object>();
            currentWriteFile = baseDirectory + "Table-1";
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString()+ "\n");
                lastQueue.remove(0);
            }
            writer.close();
        }else{
            if(fileList.size()>0){
                    BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                    String line=null;
                    while ((line=reader.readLine())!=null){
                        currentQueue.add(line);
                    }
                    reader.close();
                    File toDelete = new File(fileList.get(0));
                    toDelete.delete();
            }

            if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1)));
                currentWriteFile = fileList.get(fileList.size()-1);
                String line=null;
                while ((line=reader.readLine())!=null){
                    lastQueue.add(line);
                }
                reader.close();
                //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9));
            }
        }

    }

    private void loadFirst() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                String line=null;
                while ((line=reader.readLine())!=null){
                    currentQueue.add(line);
                }
                reader.close();
                File toDelete = new File(fileList.get(0));
                toDelete.delete();
        }
    }

    public Object pop(){
        if(currentQueue.size()>0)
            return  currentQueue.remove(0);

        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if(currentQueue.size()>0)
            return  currentQueue.remove(0);
        else
            return null;
    }

    public synchronized Object waitTillPop() throws InterruptedException{
        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            if(currentQueue.size()==0)
                wait();
        }
        return currentQueue.remove(0);
    }

    public synchronized void push(Object data) throws IOException{
        lastQueue.add(data);
        this.notifyAll();
        if(lastQueue.size()>=minFlushSize){
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString() + "\n");
                lastQueue.remove(0);
            }
            writer.close();

            currentWriteFile  = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
                    (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1);
        }
    }

    public static void main(String[] args) {
        try {
            BlockingQueue bq = new BlockingQueue();

            for(int i =0 ; i<=8 ; i++){
                bq.push(""+i);
            }

            System.out.println(bq.pop());
            System.out.println(bq.pop());
            System.out.println(bq.pop());

            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());



        } catch (Exception e) {
            e.printStackTrace();
        }
    }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文