Java 电子邮件发送队列 - 固定数量的线程发送尽可能多的可用消息

发布于 2024-08-06 05:20:06 字数 782 浏览 7 评论 0原文

我正在编写一个消息处理应用程序(电子邮件),我想要一个传出队列。我的设计方式是拥有一个单例队列类 ThreadedQueueSender,由 Executor Service 和 BlockingQueue 支持。此外,javax.mail.Transport 对象的线程池用于获取和释放与传出 SMTP 服务器的连接。

此类公开了一个方法 add(MimeMessage),该方法将消息添加到工作队列 (BlockingQueue)。

在类实例化时,ExecutorService 被初始化为具有固定数量线程的 ThreadPoolExecutor,比如说 5 个。每个线程的 run()方法处于无限循环中,仅在检测到中断时退出(当调用 ExecutorService.shutdownNow() 时)。

此 run 方法使用 BlockingQueue.poll() 从工作队列中获取消息,直到没有更多消息可用,然后从连接池请求 Transport 对象,打开连接,发送其检索到​​的所有消息,关闭连接并返回 Transport 对象。

这是可行的,但我觉得我没有通过在应用程序的生命周期中运行固定数量的线程来充分利用 ExecutorService。另外,我自己管理工作队列,而不是让并发框架来处理它。其他人将如何实现此功能?将每个传入消息包装在 Runnable 中,然后执行发送逻辑是否更好?

谢谢,如有任何意见,我们将不胜感激。

瑞安

I'm writing a message processing application (email) that I want to have an outgoing queue. The way I've designed this is having a singleton queue class, ThreadedQueueSender, backed by an Executor Service and a BlockingQueue. Additionally, a thread pool of javax.mail.Transport objects is used to obtain and release connections to the outgoing SMTP server.

This class exposes a method, add(MimeMessage), that adds messages to the work queue (BlockingQueue).

At instantiation of the class, the ExecutorService is initialized to a ThreadPoolExecutor with a fixed number of threads, lets say 5. Each thread's run() method is in infinite loop that only exits when it detects interrupt (when ExecutorService.shutdownNow() is called).

This run method uses BlockingQueue.poll() to take messsages from the work queue until no more are available without blocking, then requests a Transport object from the connection pool, opens the connection, sends all the messages its retrieved, closes the connection and returns the Transport object.

This works, but I feel I am not taking full advantage of the ExecutorService by having a fixed number of threads that run for the life of the application. Also, I am managing the work queue myself instead of letting the concurrency frameworks handle it. How would others implement this functionality? Is it better to wrap each incoming message in a Runnable, then execute the sending logic?

Thank you, any comments are appreciated.

Ryan

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

被你宠の有点坏 2024-08-13 05:20:06

您应该为执行器服务应该完成的每项工作创建任务。

例如,您可以创建一个可调用的“MailSendingTask”来保存 MimeMessage 并包装邮件发送。通过将这些 MailSendingTasks 提交给您的执行器来对它们进行排队。现在您的执行器决定将创建多少个线程(通过设置下限和上限线程池边界进行配置)

您只需要创建 2 或 3 个类/接口

  • 一个 MailService 接口,它提供一个简单的 send(MimeMessage msg) 方法
  • 一个 MailServiceImplementation 类实现 MailService 并保存对已配置执行器的引用,
  • 其中一个类 MailSenderTask 实现可调用接口,该接口保存对 MimeMessage 对象的引用并执行邮件发送。

您甚至可以更进一步,创建一个额外的服务来管理 MailSenderTask 可以使用的邮件套接字连接。

如果你想添加“取消”,你应该查看类 Future 和 FutureTask

You should create tasks for every piece of work that should be done by your executor service.

For example you could create a callable "MailSendingTask" that holds the MimeMessage and wraps the mail sending. Queue these MailSendingTasks by submitting them to your executor. Now your Executor decides how many threads will be created (Config it by setting lower and upper thread pool bounds)

You only need to create 2 or 3 classes/interfaces

  • one MailService Interface that provides a simple send(MimeMessage msg) method
  • one MailServiceImplementation Class that implements MailService and holds a reference to a configured executor
  • one class MailSenderTask implementing the callable interface that holds a reference to the MimeMessage object and which does the mail sending.

You could even go futher by creating an extra service that manages the mail socket connections which could be used by the MailSenderTask.

If you want to add "cancelation" you should look at the classes Future and FutureTask

冷弦 2024-08-13 05:20:06

将消息包装在 Runnable 中会迫使您要么使工作队列不受限制,要么处理队列已满时发生的情况。 ThreadPoolExecutor 为您提供了一些处理这种情况的策略 - 请参阅 ThreadPoolExecutor javadoc 了解详细信息。 - 放弃,自己运行/丢弃一些东西

你可以做的另一件事是允许线程池创建超出其核心大小的线程,线程如何产生以及何时收获它们由 ThreadPoolExecutor 的前 4 个参数描述构造函数。这在现实中的效果如何取决于资源瓶颈。

另外,在您的情况下,BlockingQueue.pollBlockingQueue.take 有什么优势?两者都是可中断的,并且您的线程只有一项任务,因此阻塞并非不可取。

Wrapping up the messages in a Runnable would force you to either make the work queue unbounded or deal with what happens when the queue is full. ThreadPoolExecutor gives you a few policies for dealing with this situation - See ThreadPoolExecutor javadoc for details. - Give up, run it yourself / discard something

Another thing you can do is allow the thread pool to create threads beyond its core size, how threads are spawned and when they are reaped is described by the first 4 arguments to the ThreadPoolExecutor constructor. How well this works in reality will depend on the resource bottleneck.

Also, what's the advantage of BlockingQueue.poll in your situation, rather than BlockingQueue.take? Both are interruptible, and your thread only has one task, so blocking is not undesirable.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文