我应该使用 Java 中的哪个线程池?

发布于 2024-09-09 09:51:40 字数 195 浏览 9 评论 0 原文

有大量的任务。 每个任务都属于一个组。要求是每组任务应该像在单线程中执行一样串行执行,并且在多核(或多CPU)环境中吞吐量应该最大化。注意:组的数量也与任务数量成正比。

最简单的解决方案是使用 ThreadPoolExecutor 并同步(或锁定)。然而,线程会互相阻塞并且吞吐量没有最大化。

还有更好的主意吗?或者是否存在满足要求的第三方库?

There are a huge amount of tasks.
Each task is belong to a single group. The requirement is each group of tasks should executed serially just like executed in a single thread and the throughput should be maximized in a multi-core (or multi-cpu) environment. Note: there are also a huge amount of groups that is proportional to the number of tasks.

The naive solution is using ThreadPoolExecutor and synchronize (or lock). However, threads would block each other and the throughput is not maximized.

Any better idea? Or is there exist a third party library satisfy the requirement?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

花辞树 2024-09-16 09:51:40

一种简单的方法是将所有组任务“连接”成一个超级任务,从而使子任务串行运行。但这可能会导致其他组延迟,除非其他组完全完成并在线程池中腾出一些空间,否则这些组不会启动。

作为替代方案,请考虑链接小组的任务。下面的代码说明了这一点:

public class MultiSerialExecutor {
    private final ExecutorService executor;

    public MultiSerialExecutor(int maxNumThreads) {
        executor = Executors.newFixedThreadPool(maxNumThreads);
    }

    public void addTaskSequence(List<Runnable> tasks) {
        executor.execute(new TaskChain(tasks));
    }

    private void shutdown() {
        executor.shutdown();
    }

    private class TaskChain implements Runnable {
        private List<Runnable> seq;
        private int ind;

        public TaskChain(List<Runnable> seq) {
            this.seq = seq;
        }

        @Override
        public void run() {
            seq.get(ind++).run(); //NOTE: No special error handling
            if (ind < seq.size())
                executor.execute(this);
        }       
    }

优点是不使用额外的资源(线程/队列),并且任务的粒度比简单方法中的更好。缺点是所有小组的任务都应该提前知道

--编辑--

为了使这个解决方案通用和完整,您可能需要决定错误处理(即,即使发生错误,链是否继续),并且实现 ExecutorService 并委托所有调用也是一个好主意到底层执行者。

A simple approach would be to "concatenate" all group tasks into one super task, thus making the sub-tasks run serially. But this will probably cause delay in other groups that will not start unless some other group completely finishes and makes some space in the thread pool.

As an alternative, consider chaining a group's tasks. The following code illustrates it:

public class MultiSerialExecutor {
    private final ExecutorService executor;

    public MultiSerialExecutor(int maxNumThreads) {
        executor = Executors.newFixedThreadPool(maxNumThreads);
    }

    public void addTaskSequence(List<Runnable> tasks) {
        executor.execute(new TaskChain(tasks));
    }

    private void shutdown() {
        executor.shutdown();
    }

    private class TaskChain implements Runnable {
        private List<Runnable> seq;
        private int ind;

        public TaskChain(List<Runnable> seq) {
            this.seq = seq;
        }

        @Override
        public void run() {
            seq.get(ind++).run(); //NOTE: No special error handling
            if (ind < seq.size())
                executor.execute(this);
        }       
    }

The advantage is that no extra resource (thread/queue) is being used, and that the granularity of tasks is better than the one in the naive approach. The disadvantage is that all group's tasks should be known in advance.

--edit--

To make this solution generic and complete, you may want to decide on error handling (i.e whether a chain continues even if an error occures), and also it would be a good idea to implement ExecutorService, and delegate all calls to the underlying executor.

毁梦 2024-09-16 09:51:40

我建议使用任务队列:

  • 对于每组任务,您都创建一个队列并将该组中的所有任务插入其中。
  • 现在,您的所有队列都可以并行执行,而一个队列内的任务则串行执行。

快速谷歌搜索表明 java api 本身没有任务/线程队列。然而,有很多关于编码的教程。如果您知道一些,每个人都可以随意列出好的教程/实现:

I would suggest to use task queues:

  • For every group of tasks You have create a queue and insert all tasks from that group into it.
  • Now all Your queues can be executed in parallel while the tasks inside one queue are executed serially.

A quick google search suggests that the java api has no task / thread queues by itself. However there are many tutorials available on coding one. Everyone feel free to list good tutorials / implementations if You know some:

乖乖公主 2024-09-16 09:51:40

我基本上同意 Dave 的答案,但是如果您需要在所有“组”之间划分 CPU 时间,即所有任务组都应该并行进行,您可能会发现这种构造很有用(使用删除作为“锁”)。这在我的情况是,尽管我认为它往往会使用更多内存):

class TaskAllocator {
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
         = childQueuePerTaskGroup();

    public Queue<Runnable> lockTaskGroup(){
        return entireWork.poll();
    }

    public void release(Queue<Runnable> taskGroup){
        entireWork.offer(taskGroup);
    }
 }

然后

 class DoWork implmements Runnable {
     private final TaskAllocator allocator;

     public DoWork(TaskAllocator allocator){
         this.allocator = allocator;
     }

     pubic void run(){
        for(;;){
            Queue<Runnable> taskGroup = allocator.lockTaskGroup();
            if(task==null){
                //No more work
                return;
            }
            Runnable work = taskGroup.poll();
            if(work == null){
                //This group is done
                continue;
            }

            //Do work, but never forget to release the group to 
            // the allocator.
            try {
                work.run();
            } finally {
                allocator.release(taskGroup);
            }
        }//for
     }
 }

您可以使用最佳数量的线程来运行 DoWork 任务。这是一种循环负载平衡。

您甚至可以做一些更复杂的事情,方法是在 TaskAllocator 中使用它而不是简单的队列(剩余任务较多的任务组往往会被执行),

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator());

其中 复杂比较器

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
        int diff = o2.size() - o1.size();
        if(diff==0){
             //This is crucial. You must assign unique ids to your 
             //Subqueue and break the equality if they happen to have same size.
             //Otherwise your queues will disappear...
             return o1.id - o2.id;
        }
        return diff;
    }
 }

I mostly agree on Dave's answer, but if you need to slice CPU time across all "groups", i.e. all task groups should progress in parallel, you might find this kind of construct useful (using removal as "lock". This worked fine in my case although I imagine it tends to use more memory):

class TaskAllocator {
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
         = childQueuePerTaskGroup();

    public Queue<Runnable> lockTaskGroup(){
        return entireWork.poll();
    }

    public void release(Queue<Runnable> taskGroup){
        entireWork.offer(taskGroup);
    }
 }

and

 class DoWork implmements Runnable {
     private final TaskAllocator allocator;

     public DoWork(TaskAllocator allocator){
         this.allocator = allocator;
     }

     pubic void run(){
        for(;;){
            Queue<Runnable> taskGroup = allocator.lockTaskGroup();
            if(task==null){
                //No more work
                return;
            }
            Runnable work = taskGroup.poll();
            if(work == null){
                //This group is done
                continue;
            }

            //Do work, but never forget to release the group to 
            // the allocator.
            try {
                work.run();
            } finally {
                allocator.release(taskGroup);
            }
        }//for
     }
 }

You can then use optimum number of threads to run the DoWork task. It's kind of a round robin load balance..

You can even do something more sophisticated, by using this instead of a simple queue in TaskAllocator (task groups with more task remaining tend to get executed)

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator());

where SophisticatedComparator is

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
        int diff = o2.size() - o1.size();
        if(diff==0){
             //This is crucial. You must assign unique ids to your 
             //Subqueue and break the equality if they happen to have same size.
             //Otherwise your queues will disappear...
             return o1.id - o2.id;
        }
        return diff;
    }
 }
眼泪都笑了 2024-09-16 09:51:40

Actor 也是针对此类特定问题的另一种解决方案。
Scala 有 actor,也有 Java,由 AKKA 提供。

Actor is also another solution for this specified type of issues.
Scala has actors and also Java, which provided by AKKA.

情独悲 2024-09-16 09:51:40

我遇到了与您类似的问题,我使用了与 Executor 配合使用的 ExecutorCompletionService 来完成任务集合。
以下是 Java7 以来 java.util.concurrent API 的摘录:

假设您有一组针对某个问题的求解器,每个求解器都返回某种 Result 类型的值,并且希望同时运行它们,以某种方法处理每个返回非空值的结果使用(结果 r)。您可以将其写为:

void solve(Executor e, Collection<Callable<Result>> solvers)
        throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null)
            use(r);
    }
}

因此,在您的场景中,每个任务都将是一个 Callable,并且任务将分组在 Collection>Collection>.

参考:
http://docs.oracle.com/javase /7/docs/api/java/util/concurrent/ExecutorCompletionService.html

I had a problem similar to your, and I used an ExecutorCompletionService that works with an Executor to complete collections of tasks.
Here is an extract from java.util.concurrent API, since Java7:

Suppose you have a set of solvers for a certain problem, each returning a value of some type Result, and would like to run them concurrently, processing the results of each of them that return a non-null value, in some method use(Result r). You could write this as:

void solve(Executor e, Collection<Callable<Result>> solvers)
        throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null)
            use(r);
    }
}

So, in your scenario, every task will be a single Callable<Result>, and tasks will be grouped in a Collection<Callable<Result>>.

Reference:
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文