ThreadPoolExecutor 策略

发布于 2024-09-13 19:34:06 字数 733 浏览 16 评论 0原文

我正在尝试使用 ThreadPoolExecutor 来安排任务,但遇到了其策略的一些问题。以下是其声明的行为:

  1. 如果运行的线程少于 corePoolSize,则执行器始终倾向于添加新线程而不是排队。
  2. 如果 corePoolSize 或更多线程正在运行,Executor 总是更喜欢对请求进行排队而不是添加新线程。
  3. 如果请求无法排队,则会创建一个新线程,除非这超出了 MaximumPoolSize,在这种情况下,该任务将被拒绝。

我想要的行为是这样的:

  1. 与上面相同
  2. 如果正在运行的线程超过 corePoolSize 但少于 maxPoolSize,则更喜欢添加新线程而不是排队,并使用空闲线程而不是添加新线程。
  3. 与上面相同

基本上我不希望任何任务被拒绝;我希望它们在无界队列中排队。但我确实希望拥有最大数量的线程。如果我使用无界队列,它在达到 coreSize 后永远不会生成线程。如果我使用有界队列,它会拒绝任务。有什么办法解决这个问题吗?

我现在正在考虑的是在 SynchronousQueue 上运行 ThreadPoolExecutor,但不直接向其提供任务 - 而是将它们提供给单独的无界 LinkedBlockingQueue。然后另一个线程从 LinkedBlockingQueue 送入 Executor,如果一个线程被拒绝,它会再次尝试,直到没有被拒绝为止。不过,这看起来很痛苦,而且有点麻烦——有没有更干净的方法来做到这一点?

I'm trying to use a ThreadPoolExecutor to schedule tasks, but running into some problems with its policies. Here's its stated behavior:

  1. If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  2. If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  3. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

The behavior I want is this:

  1. same as above
  2. If more than corePoolSize but less than maximumPoolSize threads are running, prefers adding a new thread over queuing, and using an idle thread over adding a new thread.
  3. same as above

Basically I don't want any tasks to be rejected; I want them to be queued in an unbounded queue. But I do want to have up to maximumPoolSize threads. If I use an unbounded queue, it never generates threads after it hits coreSize. If I use a bounded queue, it rejects tasks. Is there any way around this?

What I'm thinking about now is running the ThreadPoolExecutor on a SynchronousQueue, but not feeding tasks directly to it - instead feeding them to a separate unbounded LinkedBlockingQueue. Then another thread feeds from the LinkedBlockingQueue into the Executor, and if one gets rejected, it simply tries again until it is not rejected. This seems like a pain and a bit of a hack, though - is there a cleaner way to do this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

偏爱自由 2024-09-20 19:34:06

可能没有必要按照请求对线程池进行微观管理。

缓存的线程池将重用空闲线程,同时还允许潜在的无限并发线程。这当然可能导致突发期间上下文切换开销导致性能失控下降。

Executors.newCachedThreadPool();

更好的选择是对线程总数进行限制,同时放弃确保首先使用空闲线程的概念。配置更改将是:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

根据此场景推理,如果执行程序的线程数少于 corePoolSize,那么它一定不会很忙。如果系统不是很忙,那么启动一个新线程没什么坏处。这样做将导致您的 ThreadPoolExecutor 始终创建一个新的工作线程(如果它低于允许的最大工作线程数)。只有当最大数量的worker“运行”时,才会给空闲等待任务的worker分配任务。如果工作线程在没有任务的情况下等待aReasonableTimeDuration,则允许其终止。对池大小使用合理的限制(毕竟只有这么多 CPU)和相当大的超时(以防止线程不必要地终止),可能会看到所需的好处。

最后的选择是很黑客的。基本上,ThreadPoolExecutor 内部使用 BlockingQueue.offer 来确定队列是否有容量。 BlockingQueue 的自定义实现始终可以拒绝 offer 尝试。当ThreadPoolExecutor无法向队列提供任务时,它将尝试创建一个新的worker。如果无法创建新的工作线程,则会调用 RejectedExecutionHandler。此时,自定义 RejectedExecutionHandler 可以强制将 put 放入自定义 BlockingQueue 中。

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}

It probably isn't necessary to micro-manage the thread pool as being requested.

A cached thread pool will re-use idle threads while also allowing potentially unlimited concurrent threads. This of course could lead to runaway performance degrading from context switching overhead during bursty periods.

Executors.newCachedThreadPool();

A better option is to place a limit on the total number of threads while discarding the notion of ensuring idle threads are used first. The configuration changes would be:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

Reasoning over this scenario, if the executor has less than corePoolSize threads, than it must not be very busy. If the system is not very busy, then there is little harm in spinning up a new thread. Doing this will cause your ThreadPoolExecutor to always create a new worker if it is under the maximum number of workers allowed. Only when the maximum number of workers are "running" will workers waiting idly for tasks be given tasks. If a worker waits aReasonableTimeDuration without a task, then it is allowed to terminate. Using reasonable limits for the pool size (after all, there are only so many CPUs) and a reasonably large timeout (to keep threads from needlessly terminating), the desired benefits will likely be seen.

The final option is hackish. Basically, the ThreadPoolExecutor internally uses BlockingQueue.offer to determine if the queue has capacity. A custom implementation of BlockingQueue could always reject the offer attempt. When the ThreadPoolExecutor fails to offer a task to the queue, it will try to make a new worker. If a new worker can not be created, a RejectedExecutionHandler would be called. At that point, a custom RejectedExecutionHandler could force a put into the custom BlockingQueue.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}
北方的巷 2024-09-20 19:34:06

您的用例很常见,完全合法,但不幸的是比人们想象的要困难。有关背景信息,您可以阅读此讨论并找到指向解决方案的指针(也在线程中提到)此处。谢伊的解决方案效果很好。

一般来说,我会对无界队列有点警惕;通常最好有明确的传入流量控制,可以优雅地降级并调节当前/剩余工作的比率,以免压垮生产者或消费者。

Your use case is common, completely legit and unfortunately more difficult than one would expect. For background info you can read this discussion and find a pointer to a solution (also mentioned in the thread) here. Shay's solution works fine.

Generally I'd be a bit wary of unbounded queues; it's usually better to have explicit incoming flow control that degrades gracefully and regulates the ratio of current/remaining work to not overwhelm either producer or consumer.

初熏 2024-09-20 19:34:06

只需设置 corePoolsize = MaximumPoolSize 并使用无界队列?

在您的点列表中,1 排除 2,因为 corePoolSize 将始终小于或等于 maximumPoolSize

编辑

您想要的和 TPE 为您提供的之间仍然存在一些不兼容的地方。

如果您有一个无界队列,则 maximumPoolSize 会被忽略,因此,正如您所观察到的,将不会创建和使用超过 corePoolSize 的线程。

所以,再说一遍,如果您将 corePoolsize = MaximumPoolSize 与无界队列一起使用,您就得到了您想要的,不是吗?

Just set corePoolsize = maximumPoolSize and use an unbounded queue?

In your list of points, 1 excludes 2, since corePoolSize will always be less or equal than maximumPoolSize.

Edit

There is still something incompatible between what you want and what TPE will offer you.

If you have an unbounded queue, maximumPoolSize is ignored so, as you observed, no more than corePoolSize threads will ever be created and used.

So, again, if you take corePoolsize = maximumPoolSize with an unbounded queue, you have what you want, no?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文