Java 电子邮件发送队列 - 固定数量的线程发送尽可能多的可用消息
我正在编写一个消息处理应用程序(电子邮件),我想要一个传出队列。我的设计方式是拥有一个单例队列类 ThreadedQueueSender,由 Executor Service 和 BlockingQueue 支持。此外,javax.mail.Transport 对象的线程池用于获取和释放与传出 SMTP 服务器的连接。
此类公开了一个方法 add(MimeMessage)
,该方法将消息添加到工作队列 (BlockingQueue
)。
在类实例化时,ExecutorService
被初始化为具有固定数量线程的 ThreadPoolExecutor
,比如说 5 个。每个线程的 run()
方法处于无限循环中,仅在检测到中断时退出(当调用 ExecutorService.shutdownNow() 时)。
此 run 方法使用 BlockingQueue.poll()
从工作队列中获取消息,直到没有更多消息可用,然后从连接池请求 Transport
对象,打开连接,发送其检索到的所有消息,关闭连接并返回 Transport
对象。
这是可行的,但我觉得我没有通过在应用程序的生命周期中运行固定数量的线程来充分利用 ExecutorService。另外,我自己管理工作队列,而不是让并发框架来处理它。其他人将如何实现此功能?将每个传入消息包装在 Runnable 中,然后执行发送逻辑是否更好?
谢谢,如有任何意见,我们将不胜感激。
瑞安
I'm writing a message processing application (email) that I want to have an outgoing queue. The way I've designed this is having a singleton queue class, ThreadedQueueSender, backed by an Executor Service and a BlockingQueue. Additionally, a thread pool of javax.mail.Transport objects is used to obtain and release connections to the outgoing SMTP server.
This class exposes a method, add(MimeMessage)
, that adds messages to the work queue (BlockingQueue
).
At instantiation of the class, the ExecutorService
is initialized to a ThreadPoolExecutor
with a fixed number of threads, lets say 5. Each thread's run()
method is in infinite loop that only exits when it detects interrupt (when ExecutorService.shutdownNow()
is called).
This run method uses BlockingQueue.poll()
to take messsages from the work queue until no more are available without blocking, then requests a Transport
object from the connection pool, opens the connection, sends all the messages its retrieved, closes the connection and returns the Transport
object.
This works, but I feel I am not taking full advantage of the ExecutorService by having a fixed number of threads that run for the life of the application. Also, I am managing the work queue myself instead of letting the concurrency frameworks handle it. How would others implement this functionality? Is it better to wrap each incoming message in a Runnable, then execute the sending logic?
Thank you, any comments are appreciated.
Ryan
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您应该为执行器服务应该完成的每项工作创建任务。
例如,您可以创建一个可调用的“MailSendingTask”来保存 MimeMessage 并包装邮件发送。通过将这些 MailSendingTasks 提交给您的执行器来对它们进行排队。现在您的执行器决定将创建多少个线程(通过设置下限和上限线程池边界进行配置)
您只需要创建 2 或 3 个类/接口
您甚至可以更进一步,创建一个额外的服务来管理 MailSenderTask 可以使用的邮件套接字连接。
如果你想添加“取消”,你应该查看类 Future 和 FutureTask
You should create tasks for every piece of work that should be done by your executor service.
For example you could create a callable "MailSendingTask" that holds the MimeMessage and wraps the mail sending. Queue these MailSendingTasks by submitting them to your executor. Now your Executor decides how many threads will be created (Config it by setting lower and upper thread pool bounds)
You only need to create 2 or 3 classes/interfaces
You could even go futher by creating an extra service that manages the mail socket connections which could be used by the MailSenderTask.
If you want to add "cancelation" you should look at the classes Future and FutureTask
将消息包装在 Runnable 中会迫使您要么使工作队列不受限制,要么处理队列已满时发生的情况。
ThreadPoolExecutor
为您提供了一些处理这种情况的策略 - 请参阅 ThreadPoolExecutor javadoc 了解详细信息。 - 放弃,自己运行/丢弃一些东西你可以做的另一件事是允许线程池创建超出其核心大小的线程,线程如何产生以及何时收获它们由 ThreadPoolExecutor 的前 4 个参数描述构造函数。这在现实中的效果如何取决于资源瓶颈。
另外,在您的情况下,
BlockingQueue.poll
比BlockingQueue.take
有什么优势?两者都是可中断的,并且您的线程只有一项任务,因此阻塞并非不可取。Wrapping up the messages in a
Runnable
would force you to either make the work queue unbounded or deal with what happens when the queue is full.ThreadPoolExecutor
gives you a few policies for dealing with this situation - See ThreadPoolExecutor javadoc for details. - Give up, run it yourself / discard somethingAnother thing you can do is allow the thread pool to create threads beyond its core size, how threads are spawned and when they are reaped is described by the first 4 arguments to the
ThreadPoolExecutor
constructor. How well this works in reality will depend on the resource bottleneck.Also, what's the advantage of
BlockingQueue.poll
in your situation, rather thanBlockingQueue.take
? Both are interruptible, and your thread only has one task, so blocking is not undesirable.