是否建议将任务添加到 ThreadPoolExecutor 的 BlockingQueue 中?

发布于 2024-10-30 18:22:52 字数 1976 浏览 1 评论 0原文

ThreadPoolExecutor 的 JavaDoc 不清楚是否可以接受将任务直接添加到支持执行器的 BlockingQueue 中。 文档说 调用 executor.getQueue()“主要用于调试和监视”。

我正在使用自己的 BlockingQueue 构造一个 ThreadPoolExecutor。我保留了对队列的引用,以便我可以直接向其中添加任务。 getQueue() 返回相同的队列,因此我假设 getQueue() 中的警告适用于对通过我的方式获取的后备队列的引用。

示例

代码的一般模式是:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer() vs executor.execute()

据我了解,典型用途是通过 executor 添加任务.execute()。上面示例中的方法具有阻塞队列的优点,而如果队列已满并拒绝我的任务,则 execute() 会立即失败。我还喜欢提交作业与阻塞队列交互;对我来说,这感觉更“纯粹”的生产者-消费者。

直接将任务添加到队列的含义:我必须调用 prestartAllCoreThreads() 否则没有工作线程正在运行。假设没有与执行器进行其他交互,则不会有任何东西监视队列(对 ThreadPoolExecutor 源代码的检查证实了这一点)。这也意味着对于直接排队,ThreadPoolExecutor 必须额外配置 > > 0 个核心线程,并且不得配置为允许核心线程超时。

tl;dr

给定一个 ThreadPoolExecutor 配置如下:

  • 核心线程 > 0 个
  • 核心线程不允许超时
  • 核心线程已预启动
  • 持有对支持执行程序的 BlockingQueue 的引用

是否可以将任务直接添加到队列而不是调用 executor.execute() ?

相关

这个问题(生产者/消费者工作队列)类似,但没有具体涉及添加直接到队列。

The JavaDoc for ThreadPoolExecutor is unclear on whether it is acceptable to add tasks directly to the BlockingQueue backing the executor. The docs say calling executor.getQueue() is "intended primarily for debugging and monitoring".

I'm constructing a ThreadPoolExecutor with my own BlockingQueue. I retain a reference to the queue so I can add tasks to it directly. The same queue is returned by getQueue() so I assume the admonition in getQueue() applies to a reference to the backing queue acquired through my means.

Example

General pattern of the code is:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer() vs executor.execute()

As I understand it, the typical use is to add tasks via executor.execute(). The approach in my example above has the benefit of blocking on the queue whereas execute() fails immediately if the queue is full and rejects my task. I also like that submitting jobs interacts with a blocking queue; this feels more "pure" producer-consumer to me.

An implication of adding tasks to the queue directly: I must call prestartAllCoreThreads() otherwise no worker threads are running. Assuming no other interactions with the executor, nothing will be monitoring the queue (examination of ThreadPoolExecutor source confirms this). This also implies for direct enqueuing that the ThreadPoolExecutor must additionally be configured for > 0 core threads and mustn't be configured to allow core threads to timeout.

tl;dr

Given a ThreadPoolExecutor configured as follows:

  • core threads > 0
  • core threads aren't allowed to timeout
  • core threads are prestarted
  • hold a reference to the BlockingQueue backing the executor

Is it acceptable to add tasks directly to the queue instead of calling executor.execute()?

Related

This question ( producer/consumer work queues ) is similar, but doesn't specifically cover adding to the queue directly.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

叹梦 2024-11-06 18:22:52

如果是我,我会更喜欢使用 Executor#execute() 而不是 Queue#offer(),因为我使用的是 java.util 中的其他所有内容.concurrent 已经。

你的问题提得很好,它激起了我的兴趣,所以我看了一下 ThreadPoolExecutor#execute() 的源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

我们可以看到执行本身调用了 offer() code> 放在工作队列上,但在必要时进行一些漂亮、美味的池操作之前。因此,我认为建议使用 execute();不使用它可能(尽管我不确定)导致池以非最佳方式运行。但是,我不认为使用 offer()破坏执行器 - 看起来任务是使用以下命令从队列中拉出的(也来自 ThreadPoolExecutor)

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

getTask() 方法只是从循环内调用,因此如果执行器没有关闭,它将阻塞,直到将新任务分配给队列(无论它来自哪里)。

注意:尽管我已经在此处发布了源代码片段,但我们不能依赖它们来获得明确的答案 - 我们应该只对 API 进行编码。我们不知道 execute() 的实现将如何随着时间的推移而改变。

If it were me, I would prefer using Executor#execute() over Queue#offer(), simply because I'm using everything else from java.util.concurrent already.

Your question is a good one, and it piqued my interest, so I took a look at the source for ThreadPoolExecutor#execute():

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

We can see that execute itself calls offer() on the work queue, but not before doing some nice, tasty pool manipulations if necessary. For that reason, I'd think that it'd be advisable to use execute(); not using it may (although I don't know for certain) cause the pool to operate in a non-optimal way. However, I don't think that using offer() will break the executor - it looks like tasks are pulled off the queue using the following (also from ThreadPoolExecutor):

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

This getTask() method is just called from within a loop, so if the executor's not shutting down, it'd block until a new task was given to the queue (regardless of from where it came from).

Note: Even though I've posted code snippets from source here, we can't rely on them for a definitive answer - we should only be coding to the API. We don't know how the implementation of execute() will change over time.

我们的影子 2024-11-06 18:22:52

一个技巧是实现 ArrayBlockingQueue 的自定义子类并重写 Offer() 方法来调用您的阻塞版本,然后您仍然可以使用正常的代码路径。

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(正如您可能猜到的那样,我认为直接在队列上调用 Offer 作为您的正常代码路径可能是一个坏主意)。

One trick is to implement a custom subclass of ArrayBlockingQueue and to override the offer() method to call your blocking version, then you can still use the normal code path.

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(as you can probably guess, i think calling offer directly on the queue as your normal code path is probably a bad idea).

宛菡 2024-11-06 18:22:52

实际上,可以通过在实例化时指定 RejectedExecutionHandler 来配置队列已满时池的行为。 ThreadPoolExecutor定义了四个策略作为内部类,包括AbortPolicyDiscardOldestPolicyDiscardPolicy,以及我个人最喜欢的, CallerRunsPolicy,它在控制线程中运行新作业。

例如:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

问题中所需的行为可以使用以下方法获得:

public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

在某些时候,必须访问队列。这样做的最佳位置是在独立的 RejectedExecutionHandler 中,它可以保存任何代码重复或因在池对象范围内直接操作队列而产生的潜在错误。请注意,ThreadPoolExecutor 中包含的处理程序本身使用 getQueue()。

One can actually configure behavior of the pool when the queue is full, by specifying a RejectedExecutionHandler at instantiation. ThreadPoolExecutor defines four policies as inner classes, including AbortPolicy, DiscardOldestPolicy, DiscardPolicy, as well as my personal favorite, CallerRunsPolicy, which runs the new job in the controlling thread.

For example:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

The behavior desired in the question can be obtained using something like:

public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

At some point the queue must be accessed. The best place to do so is in a self-contained RejectedExecutionHandler, which saves any code duplication or potenial bugs arising from direct manipulation of the queue at the scope of the pool object. Note that the handlers included in ThreadPoolExecutor themselves use getQueue().

江南烟雨〆相思醉 2024-11-06 18:22:52

如果您使用的队列与标准内存中 LinkedBlockingQueueArrayBlockingQueue 的实现完全不同,那么这是一个非常重要的问题。

例如,如果您在不同机器上使用多个生产者来实现生产者-消费者模式,并使用基于单独持久性子系统(如 Redis)的排队机制,那么问题本身就变得相关,即使您不这样做想要一个像OP一样的阻塞offer()

因此,给定的答案,即必须调用 prestartAllCoreThreads() (或足够多次 prestartCoreThread())才能使工作线程可用并运行,这一点非常重要强调。

It's a very important question if the queue you're using is a completely different implementation from the standard in-memory LinkedBlockingQueue or ArrayBlockingQueue.

For instance if you're implementing the producer-consumer pattern using several producers on different machines, and use a queuing mechanism based on a separate persistence subsystem (like Redis), then the question becomes relevant on its own, even if you don't want a blocking offer() like the OP.

So the given answer, that prestartAllCoreThreads() has to be called (or enough times prestartCoreThread()) for the worker threads to be available and running, is important enough to be stressed.

比忠 2024-11-06 18:22:52

如果需要,我们还可以使用停车场将主要处理与拒绝的任务分开 -

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();

If required, we can also use a parking lot which separates main processing from rejected tasks -

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文