是否建议将任务添加到 ThreadPoolExecutor 的 BlockingQueue 中?
ThreadPoolExecutor 的 JavaDoc 不清楚是否可以接受将任务直接添加到支持执行器的 BlockingQueue
中。 文档说 调用 executor.getQueue()“主要用于调试和监视”。
我正在使用自己的 BlockingQueue
构造一个 ThreadPoolExecutor
。我保留了对队列的引用,以便我可以直接向其中添加任务。 getQueue()
返回相同的队列,因此我假设 getQueue()
中的警告适用于对通过我的方式获取的后备队列的引用。
示例
代码的一般模式是:
int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
Runnable job = ...;
queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
try {
Thread.sleep(...);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdownNow();
queue.offer()
vs executor.execute()
据我了解,典型用途是通过 executor 添加任务.execute()
。上面示例中的方法具有阻塞队列的优点,而如果队列已满并拒绝我的任务,则 execute()
会立即失败。我还喜欢提交作业与阻塞队列交互;对我来说,这感觉更“纯粹”的生产者-消费者。
直接将任务添加到队列的含义:我必须调用 prestartAllCoreThreads()
否则没有工作线程正在运行。假设没有与执行器进行其他交互,则不会有任何东西监视队列(对 ThreadPoolExecutor 源代码的检查证实了这一点)。这也意味着对于直接排队,ThreadPoolExecutor
必须额外配置 > > 0 个核心线程,并且不得配置为允许核心线程超时。
tl;dr
给定一个 ThreadPoolExecutor 配置如下:
- 核心线程 > 0 个
- 核心线程不允许超时
- 核心线程已预启动
- 持有对支持执行程序的
BlockingQueue
的引用
是否可以将任务直接添加到队列而不是调用 executor.execute() ?
相关
这个问题(生产者/消费者工作队列)类似,但没有具体涉及添加直接到队列。
The JavaDoc for ThreadPoolExecutor is unclear on whether it is acceptable to add tasks directly to the BlockingQueue
backing the executor. The docs say calling executor.getQueue()
is "intended primarily for debugging and monitoring".
I'm constructing a ThreadPoolExecutor
with my own BlockingQueue
. I retain a reference to the queue so I can add tasks to it directly. The same queue is returned by getQueue()
so I assume the admonition in getQueue()
applies to a reference to the backing queue acquired through my means.
Example
General pattern of the code is:
int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
Runnable job = ...;
queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
try {
Thread.sleep(...);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdownNow();
queue.offer()
vs executor.execute()
As I understand it, the typical use is to add tasks via executor.execute()
. The approach in my example above has the benefit of blocking on the queue whereas execute()
fails immediately if the queue is full and rejects my task. I also like that submitting jobs interacts with a blocking queue; this feels more "pure" producer-consumer to me.
An implication of adding tasks to the queue directly: I must call prestartAllCoreThreads()
otherwise no worker threads are running. Assuming no other interactions with the executor, nothing will be monitoring the queue (examination of ThreadPoolExecutor
source confirms this). This also implies for direct enqueuing that the ThreadPoolExecutor
must additionally be configured for > 0 core threads and mustn't be configured to allow core threads to timeout.
tl;dr
Given a ThreadPoolExecutor
configured as follows:
- core threads > 0
- core threads aren't allowed to timeout
- core threads are prestarted
- hold a reference to the
BlockingQueue
backing the executor
Is it acceptable to add tasks directly to the queue instead of calling executor.execute()
?
Related
This question ( producer/consumer work queues ) is similar, but doesn't specifically cover adding to the queue directly.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
如果是我,我会更喜欢使用
Executor#execute()
而不是Queue#offer()
,因为我使用的是java.util 中的其他所有内容.concurrent
已经。你的问题提得很好,它激起了我的兴趣,所以我看了一下 ThreadPoolExecutor#execute() 的源码:
我们可以看到执行本身调用了
offer()
code> 放在工作队列上,但在必要时进行一些漂亮、美味的池操作之前。因此,我认为建议使用execute()
;不使用它可能(尽管我不确定)导致池以非最佳方式运行。但是,我不认为使用offer()
会破坏执行器 - 看起来任务是使用以下命令从队列中拉出的(也来自 ThreadPoolExecutor):
getTask()
方法只是从循环内调用,因此如果执行器没有关闭,它将阻塞,直到将新任务分配给队列(无论它来自哪里)。注意:尽管我已经在此处发布了源代码片段,但我们不能依赖它们来获得明确的答案 - 我们应该只对 API 进行编码。我们不知道
execute()
的实现将如何随着时间的推移而改变。If it were me, I would prefer using
Executor#execute()
overQueue#offer()
, simply because I'm using everything else fromjava.util.concurrent
already.Your question is a good one, and it piqued my interest, so I took a look at the source for
ThreadPoolExecutor#execute()
:We can see that execute itself calls
offer()
on the work queue, but not before doing some nice, tasty pool manipulations if necessary. For that reason, I'd think that it'd be advisable to useexecute()
; not using it may (although I don't know for certain) cause the pool to operate in a non-optimal way. However, I don't think that usingoffer()
will break the executor - it looks like tasks are pulled off the queue using the following (also from ThreadPoolExecutor):This
getTask()
method is just called from within a loop, so if the executor's not shutting down, it'd block until a new task was given to the queue (regardless of from where it came from).Note: Even though I've posted code snippets from source here, we can't rely on them for a definitive answer - we should only be coding to the API. We don't know how the implementation of
execute()
will change over time.一个技巧是实现 ArrayBlockingQueue 的自定义子类并重写 Offer() 方法来调用您的阻塞版本,然后您仍然可以使用正常的代码路径。
(正如您可能猜到的那样,我认为直接在队列上调用 Offer 作为您的正常代码路径可能是一个坏主意)。
One trick is to implement a custom subclass of ArrayBlockingQueue and to override the offer() method to call your blocking version, then you can still use the normal code path.
(as you can probably guess, i think calling offer directly on the queue as your normal code path is probably a bad idea).
实际上,可以通过在实例化时指定
RejectedExecutionHandler
来配置队列已满时池的行为。ThreadPoolExecutor
定义了四个策略作为内部类,包括AbortPolicy
、DiscardOldestPolicy
、DiscardPolicy
,以及我个人最喜欢的,CallerRunsPolicy
,它在控制线程中运行新作业。例如:
问题中所需的行为可以使用以下方法获得:
在某些时候,必须访问队列。这样做的最佳位置是在独立的 RejectedExecutionHandler 中,它可以保存任何代码重复或因在池对象范围内直接操作队列而产生的潜在错误。请注意,ThreadPoolExecutor 中包含的处理程序本身使用 getQueue()。
One can actually configure behavior of the pool when the queue is full, by specifying a
RejectedExecutionHandler
at instantiation.ThreadPoolExecutor
defines four policies as inner classes, includingAbortPolicy
,DiscardOldestPolicy
,DiscardPolicy
, as well as my personal favorite,CallerRunsPolicy
, which runs the new job in the controlling thread.For example:
The behavior desired in the question can be obtained using something like:
At some point the queue must be accessed. The best place to do so is in a self-contained
RejectedExecutionHandler
, which saves any code duplication or potenial bugs arising from direct manipulation of the queue at the scope of the pool object. Note that the handlers included inThreadPoolExecutor
themselves usegetQueue()
.如果您使用的队列与标准内存中
LinkedBlockingQueue
或ArrayBlockingQueue
的实现完全不同,那么这是一个非常重要的问题。例如,如果您在不同机器上使用多个生产者来实现生产者-消费者模式,并使用基于单独持久性子系统(如 Redis)的排队机制,那么问题本身就变得相关,即使您不这样做想要一个像OP一样的阻塞
offer()
。因此,给定的答案,即必须调用
prestartAllCoreThreads()
(或足够多次prestartCoreThread()
)才能使工作线程可用并运行,这一点非常重要强调。It's a very important question if the queue you're using is a completely different implementation from the standard in-memory
LinkedBlockingQueue
orArrayBlockingQueue
.For instance if you're implementing the producer-consumer pattern using several producers on different machines, and use a queuing mechanism based on a separate persistence subsystem (like Redis), then the question becomes relevant on its own, even if you don't want a blocking
offer()
like the OP.So the given answer, that
prestartAllCoreThreads()
has to be called (or enough timesprestartCoreThread()
) for the worker threads to be available and running, is important enough to be stressed.如果需要,我们还可以使用停车场将主要处理与拒绝的任务分开 -
If required, we can also use a parking lot which separates main processing from rejected tasks -