Java 中自适应线程池的设计注意事项

发布于 2024-07-27 05:13:46 字数 799 浏览 3 评论 0原文

我想用 Java 实现一个线程池,它可以根据提交给它的任务的计算和 I/O 行为动态调整自身大小。

实际上,我想实现与 C# 4.0 中的新线程池实现

是否已经有实现,或者我可以通过使用大部分现有的并发实用程序(例如 CachedThreadPool)来实现此行为吗?

C# 版本进行自我检测以实现最佳利用率。 Java 中提供了哪些自检测功能以及存在哪些性能影响?

是否可以采用协作方法,让任务发出其意图(例如进入 I/O 密集型操作、进入 CPU 密集型操作阶段)?

欢迎任何建议。

编辑 基于评论:

目标场景可能是:

  • 本地文件爬取和处理
  • Web 爬取
  • 多 Web 服务访问和聚合

CachedThreadPool 的问题是,当所有现有线程都被阻塞时,它会启动新线程 - 您需要对其设置明确的界限,但仅此而已。

例如,我要连续访问 100 个 Web 服务。 如果我创建一个 100 个 CTP,它将启动 100 个线程来执行操作,大量的 I/O 请求和数据传输肯定会互相绊倒。 对于静态测试用例,我将能够进行实验并找出最佳池大小,但我希望它能够以某种方式自适应地确定和应用。

I would like to implement a thread pool in Java, which can dynamically resize itself based on the computational and I/O behavior of the tasks submitted to it.

Practically, I want to achieve the same behavior as the new Thread Pool implementation in C# 4.0

Is there an implementation already or can I achieve this behavior by using mostly existing concurrency utilities (e.g. CachedThreadPool)?

The C# version does self instrumentation to achieve an optimal utilization. What self instrumentation is available in Java and what performance implications do the present?

Is it feasible to do a cooperative approach, where the task signals its intent (e.g. entering I/O intensive operation, entering CPU intensive operation phase)?

Any suggestions are welcome.

Edit Based on comments:

The target scenarios could be:

  • Local file crawling and processing
  • Web crawling
  • Multi-webservice access and aggregation

The problem of the CachedThreadPool is that it starts new threads when all existing threads are blocked - you need to set explicit bounds on it, but that's it.

For example, I have 100 web services to access in a row. If I create a 100 CTP, it will start 100 threads to perform the operation, and the ton of multiple I/O requests and data transfer will surely stumble upon each others feet. For a static test case I would be able to experiment and find out the optimal pool size, but I want it to be adaptively determined and applied in a way.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

酒几许 2024-08-03 05:13:46

考虑创建一个Map,其中关键是瓶颈资源。

提交到池的每个线程都会提交一个资源,这是它的瓶颈,即“CPU”、“网络”、“C:\”等。

您可以从每个资源只允许一个线程开始,然后可能慢慢增加直到工作完成率停止增加。 像 CPU 这样的东西可能有核心数量的下限。

Consider creating a Map where the key is the bottleneck resource.

Each thread submitted to the pool will submit a resource which is it's bottleneck, ie "CPU", "Network", "C:\" etc.

You could start by allowing only one thread per resource and then maybe slowly ramp up until work completion rate stops increasing. Things like CPU could have a floor of the core count.

叹沉浮 2024-08-03 05:13:46

让我提出一种替代方法。 拥有单个线程池是一个很好的抽象,但它的性能不是很好,特别是当作业非常受 IO 限制时 - 那么没有好的方法来调整它,很容易扩大池大小以最大化 IO 吞吐量,但你会遭受痛苦相反

,我建议查看 Apache MINA 框架的架构以获取灵感。 (http://mina.apache.org/) 这是一个高性能的 Web 框架 - 他们这样描述它作为一个服务器框架,但我认为他们的架构也适用于相反的场景,例如蜘蛛和多服务器客户端。 (实际上,您甚至可以在您的项目中开箱即用。)

它们使用 Java NIO(非阻塞 I/O)库来执行所有 IO 操作,并将工作划分为两个线程池:一组较小且快速的套接字线程,以及一组较大且较慢的业务逻辑线程。 因此各层如下所示:

  • 在网络端,有一大堆 NIO 通道,每个通道都有一个消息缓冲区
  • 一个小型套接字线程池,它们循环遍历通道列表。 他们唯一的工作是检查套接字,并将所有数据移出到消息缓冲区中 - 如果消息完成,则将其关闭并传输到作业队列。 这些家伙速度很快,因为他们只是推送位,并跳过 IO 上阻塞的任何套接字。
  • 序列化所有消息的单个作业队列
  • 大型处理线程池,将消息从队列中拉出,解析它们,并执行所需的任何处理。

这带来了非常好的性能 - IO 被分离到自己的层中,您可以调整套接字线程池以最大化 IO 吞吐量,并单独调整处理线程池以控制 CPU/资源利用率。

Let me present an alternative approach. Having a single thread pool is a nice abstraction, but it's not very performant, especially when the jobs are very IO-bound - then there's no good way to tune it, it's tempting to blow up the pool size to maximize IO throughput but you suffer from too many thread switches, etc.

Instead I'd suggest looking at the architecture of the Apache MINA framework for inspiration. (http://mina.apache.org/) It's a high-performance web framework - they describe it as a server framework, but I think their architecture works well for inverse scenarios as well, like spidering and multi-server clients. (Actually, you might even be able to use it out-of-the-box for your project.)

They use the Java NIO (non-blocking I/O) libraries for all IO operations, and divide up the work into two thread pools: a small and fast set of socket threads, and a larger and slower set of business logic threads. So the layers look as follows:

  • On the network end, a large set of NIO channels, each with a message buffer
  • A small pool of socket threads, which go through the channel list round-robin. Their only job is to check the socket, and move any data out into the message buffer - and if the message is done, close it out and transfer to the job queue. These guys are fast, because they just push bits around, and skip any sockets that are blocked on IO.
  • A single job queue that serializes all messages
  • A large pool of processing threads, which pull messages off the queue, parse them, and do whatever processing is required.

This makes for very good performance - IO is separated out into its own layer, and you can tune the socket thread pool to maximize IO throughput, and separately tune the processing thread pool to control CPU/resource utilization.

乱了心跳 2024-08-03 05:13:46

给出的示例是

Result[] a = new Result[N];
for(int i=0;i<N;i++) {
    a[i] = compute(i);
}

在 Java 中将其并行化到每个空闲核心并动态分配工作负载的方法,因此如果一个任务比另一个任务花费更长的时间并不重要。

// defined earlier
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService service = Executors.newFixedThreadPool(proc);

// main loop.
Future<Result>[] f = new Future<Result>[N];
for(int i = 0; i < N; i++) {
    final int i2 = i;
    a[i] = service.submit(new Callable<Result>() {
        public Result call() {
            return compute(i2);
        }
    }
}
Result[] a = new Result[N];
for(int i = 0; i < N; i++) 
    a[i] = f[i].get();

这在过去 5 年里没有太大变化,所以它不像刚推出时那么酷。 Java真正缺少的是闭包。 如果确实存在问题,您可以使用 Groovy。

附加:如果您关心性能,而不是作为示例,您可以并行计算斐波那契,因为它是一个很好的函数示例,如果您单线程计算它,速度会更快。

一个区别是每个线程池只有一个队列,因此不需要窃取工作。 这可能意味着每个任务的开销更多。 然而,只要您的任务通常需要超过 10 微秒,那就没有关系。

The example given is

Result[] a = new Result[N];
for(int i=0;i<N;i++) {
    a[i] = compute(i);
}

In Java the way to paralellize this to every free core and have the work load distributed dynamically so it doesn't matter if one task takes longer than another.

// defined earlier
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService service = Executors.newFixedThreadPool(proc);

// main loop.
Future<Result>[] f = new Future<Result>[N];
for(int i = 0; i < N; i++) {
    final int i2 = i;
    a[i] = service.submit(new Callable<Result>() {
        public Result call() {
            return compute(i2);
        }
    }
}
Result[] a = new Result[N];
for(int i = 0; i < N; i++) 
    a[i] = f[i].get();

This hasn't changed much in the last 5 years, so its not as cool as it was when it was first available. What Java really lacks is closures. You can use Groovy instead if that is really a problem.

Additional: If you cared about performance, rather than as an example, you would calculate Fibonacci in parallel because its a good example of a function which is faster if you calculate it single threaded.

One difference is that each thread pool only has one queue, so there is no need to steal work. This potentially means that you have more overhead per task. However, as long as your tasks typically take more than about 10 micro-seconds it won't matter.

逆流 2024-08-03 05:13:46

我认为您应该以特定于平台的方式监控 CPU 利用率。 找出您有多少个 CPU/核心,并监控负载。 当您发现负载较低,并且仍有更多工作时,请创建新线程 - 但不超过 num-cpus 的 x 倍(例如,x=2)。

如果您确实还想考虑 IO 线程,请尝试找出当池耗尽时每个线程处于什么状态,并从总数中扣除所有等待线程。 然而,一个风险是你会因为承认太多任务而耗尽记忆。

I think you should monitor CPU utilization, in a platform-specific manner. Find out how many CPUs/cores you have, and monitor the load. When you find that the load is low, and you still have more work, create new threads - but not more than x times num-cpus (say, x=2).

If you really want to consider IO threads also, try to find out what state each thread is in when your pool is exhausted, and deduct all waiting threads from the total number. One risk is that you exhaust memory by admitting too many tasks, though.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文