Java中动态创建异步消息队列

发布于 2024-08-09 02:29:23 字数 218 浏览 14 评论 0原文

我需要在 Java 中动态创建异步消息队列。我的用例是通过多个 SMTP 服务器发送电子邮件:我需要强制顺序处理发送到同一 SMTP 服务器的电子邮件,但发送到不同 SMTP 服务器的电子邮件可能会同时处理。我过去使用过 JMS,但据我所知,它只允许在编译时创建队列,而我需要在运行时创建队列(每个 SMTP 服务器一个队列)。

我是否遗漏了有关 JMS 的内容,或者是否有其他一些我应该查看的工具/建议?

I need to create asynchronous message queues dynamically in Java. My use case is sending email via multiple SMTP servers: I need to enforce that emails to the same SMTP server are processes sequentially, but emails to different SMTP servers may be processed concurrently. I've used JMS in the past, but as far as I can see it only allows for compile-time queue creation, whereas I need to create queues at runtime (one queue for each SMTP server).

Am I missing something regarding JMS or is there some other tool/proposal which I should have a look at?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

萌能量女王 2024-08-16 02:29:23

我同意 Adam 的观点,用例听起来 JMS 是开销。 Java内置功能足够了:

package de.mhaller;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

import org.junit.Assert;
import org.junit.Test;

public class Mailer {

    @Test
    public void testMailer() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        ArrayList<Mail> log = new ArrayList<Mail>();
        LinkedBlockingDeque<Mail> incoming = new LinkedBlockingDeque<Mail>();

        // TODO: Put mails to be sent into the incoming queue
        incoming.offer(new Mail("foo1@localhost", "localhost"));
        incoming.offer(new Mail("foo2@otherhost", "otherhost"));
        incoming.offer(new Mail("foo3@otherhost", "otherhost"));
        incoming.offer(new Mail("foo4@localhost", "localhost"));

        Map<Mailserver, Queue<Mail>> queues = new HashMap<Mailserver, Queue<Mail>>();
        while (!incoming.isEmpty()) {
            Mail mail = incoming.pollFirst();
            Mailserver mailserver = findMailserver(mail);
            if (!queues.containsKey(mailserver)) {
                ArrayDeque<Mail> serverQueue = new ArrayDeque<Mail>();
                queues.put(mailserver, serverQueue);
                executor.execute(new SendMail(mailserver, serverQueue));
            }
            Queue<Mail> slot = queues.get(mailserver);
            slot.offer(mail);
        }

        assertMailSentWithCorrectServer(log);
    }

    private void assertMailSentWithCorrectServer(ArrayList<Mail> log) {
        for (Mail mail : log) {
            if (!mail.server.equals(mail.sentBy.mailserver)) {
                Assert.fail("Mail sent by wrong server: " + mail);
            }
        }
    }

    private Mailserver findMailserver(Mail mail) {
        // TODO: Your lookup logic which server to use
        return new Mailserver(mail.server);
    }

    private static class Mail {
        String recipient;
        String server;
        SendMail sentBy;

        public Mail(String recipient, String server) {
            this.recipient = recipient;
            this.server = server;
        }

        @Override
        public String toString() {
            return "mail for " + recipient;
        }
    }

    public static class SendMail implements Runnable {

        private final Deque<Mail> queue;
        private final Mailserver mailserver;

        public SendMail(Mailserver mailserver, Deque<Mail> queue) {
            this.mailserver = mailserver;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (!queue.isEmpty()) {
                Mail mail = queue.pollFirst();
                // TODO: Use SMTP to send the mail via mailserver
                System.out.println(this + " sent " + mail + " via " + mailserver);
                mail.sentBy = this;
            }
        }

    }

    public static class Mailserver {
        String hostname;

        public Mailserver(String hostname) {
            this.hostname = hostname;
        }

        @Override
        public String toString() {
            return hostname;
        }

        @Override
        public int hashCode() {
            return hostname.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            return hostname.equals(((Mailserver) obj).hostname);
        }

    }

}

I agree with Adam, the use case sounds like JMS is overhead. Java built-in functionality sufficient:

package de.mhaller;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

import org.junit.Assert;
import org.junit.Test;

public class Mailer {

    @Test
    public void testMailer() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        ArrayList<Mail> log = new ArrayList<Mail>();
        LinkedBlockingDeque<Mail> incoming = new LinkedBlockingDeque<Mail>();

        // TODO: Put mails to be sent into the incoming queue
        incoming.offer(new Mail("foo1@localhost", "localhost"));
        incoming.offer(new Mail("foo2@otherhost", "otherhost"));
        incoming.offer(new Mail("foo3@otherhost", "otherhost"));
        incoming.offer(new Mail("foo4@localhost", "localhost"));

        Map<Mailserver, Queue<Mail>> queues = new HashMap<Mailserver, Queue<Mail>>();
        while (!incoming.isEmpty()) {
            Mail mail = incoming.pollFirst();
            Mailserver mailserver = findMailserver(mail);
            if (!queues.containsKey(mailserver)) {
                ArrayDeque<Mail> serverQueue = new ArrayDeque<Mail>();
                queues.put(mailserver, serverQueue);
                executor.execute(new SendMail(mailserver, serverQueue));
            }
            Queue<Mail> slot = queues.get(mailserver);
            slot.offer(mail);
        }

        assertMailSentWithCorrectServer(log);
    }

    private void assertMailSentWithCorrectServer(ArrayList<Mail> log) {
        for (Mail mail : log) {
            if (!mail.server.equals(mail.sentBy.mailserver)) {
                Assert.fail("Mail sent by wrong server: " + mail);
            }
        }
    }

    private Mailserver findMailserver(Mail mail) {
        // TODO: Your lookup logic which server to use
        return new Mailserver(mail.server);
    }

    private static class Mail {
        String recipient;
        String server;
        SendMail sentBy;

        public Mail(String recipient, String server) {
            this.recipient = recipient;
            this.server = server;
        }

        @Override
        public String toString() {
            return "mail for " + recipient;
        }
    }

    public static class SendMail implements Runnable {

        private final Deque<Mail> queue;
        private final Mailserver mailserver;

        public SendMail(Mailserver mailserver, Deque<Mail> queue) {
            this.mailserver = mailserver;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (!queue.isEmpty()) {
                Mail mail = queue.pollFirst();
                // TODO: Use SMTP to send the mail via mailserver
                System.out.println(this + " sent " + mail + " via " + mailserver);
                mail.sentBy = this;
            }
        }

    }

    public static class Mailserver {
        String hostname;

        public Mailserver(String hostname) {
            this.hostname = hostname;
        }

        @Override
        public String toString() {
            return hostname;
        }

        @Override
        public int hashCode() {
            return hostname.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            return hostname.equals(((Mailserver) obj).hostname);
        }

    }

}
深爱不及久伴 2024-08-16 02:29:23

JMS 本身作为一个规范在这个问题上相当沉默。大多数实现都允许您执行此操作,只是不是通过 JMS 本身,而是使用它们自己的 API。但是您将无法将 MDB 之类的正式内容连接到动态队列。相反,您需要管理自己的连接和侦听器。

JMS itself as a spec is rather silent on the issue. Most implementations allow you to do this, just not through JMS itself, but using their own API. But you won't be able to hook up something formal like an MDB to a dynamic queue. Rather you'll need to manage your own connections and listeners.

白馒头 2024-08-16 02:29:23

上次我们在 WebSphere 环境中查看此问题时,动态创建队列非常困难/不可能(我认为临时队列对您来说太短暂了)。尽管存在用于创建队列的 API,但它们需要服务器重新启动才能激活。还有提到的 MDB 问题。

基于以下格言的肮脏的解决方法怎么样:所有问题都可以通过额外的间接级别来解决,它假设可用打印机的集合相对较小。

创建队列 Printer01 到 Printer99(或一些较小的数字)。拥有一个将队列映射到真实打印机的“数据库”。随着对打印机的请求出现,您可以添加到映射表中。您可能需要一些 MDB 开销来查看永远不会使用的队列,但除非您潜在的打印机数量很大,否则您可能负担得起吗?

The last time we looked at this in a WebSphere environment it was surprisingly difficult/impossible to create queues dynamically (temporary queues are too transient for you I think). Although APIs for creating queues existed they required a server restart afterwards to become active. Then there's the MDB issue allused to.

How about a dirty work-around based on the adage that all problems can be solved by an extra level of indirection, which assumes that the set of available printers is comparatively small.

Create Queues Printer01 to Printer99 (or some smaller number). Have a "database" which maps queues to real printers. As requests for printers come along you can add to the mapping table. You might have some overhead of MDBs looking at queues that will never be used, but unless your pootential number of printers is vast maybe you can afford it?

我们只是彼此的过ke 2024-08-16 02:29:23

为每个 SMTP 服务器创建一个队列,并将队列使用者(MDB 或消息侦听器)限制为 1

Create a queue for each of your SMTP sever and limit queue consumer(MDB or a message listener) to 1

遗忘曾经 2024-08-16 02:29:23

我已经使用 activemq 完成了此操作 - 实际上我当时对此发布了一个问题,因为我也有类似的担忧(当时的 JMS 文档声明不支持此功能)并确信它受到支持。

I've done this with activemq - I actually posted a question on this at the time, as I had similar concerns (the JMS documentation at the time stated that this was not supported) and was assured that it was supported.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文