Java中的生产者/消费者模式

发布于 2024-11-29 08:02:54 字数 1107 浏览 2 评论 0原文

我正在考虑如何在Java中实现生产者/消费者模式。

假设我有 3 个线程和一个包含任务的列表(假设大约有 5 个任务)。每个线程从列表中获取任务并并发执行。我当前的方法是使用 CountDownLatch

int N = 3;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
ConcurrentLinkedQueue<String> tasks = new ConcurrentLinkedQueue<String>();

main() {
    for (int i=0;i<N;i++) {
        new Thread(new Worker()).start();
    }
    startSignal.countDown();
    doneSignal.await();
    System.out.println("done");
}

class Worker implements Runnable {
    public void run() {
        startSignal.await();
            while ((s = tasks.poll()) != null) {
                // do lengthy task here
                if (task failed) {
                    tasks.add(s);
                    return; // assume that task fails badly and have to stop the thread
                }
            }
        doneSignal.countDown();
    }
}

我想要实现的是,如果线程在处理任务时失败,它将被添加回任务列表中,以便由当前或任何其他线程再次拾取,但我当前的方法使用CountDownLatch 显然不可能这样做,因为在调用 didSignal.countDown() 后,线程假设它已经完成了任务。

对于这种情况,最好的方法是什么?使用 Executor 是唯一的方法吗?

I'm thinking how to implement the producer/consumer pattern in Java.

Assume that I have 3 threads and a List containing tasks (say it's about 5 tasks). Each thread grab the task from the list and execute it concurrently. My current approach is to use CountDownLatch

int N = 3;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
ConcurrentLinkedQueue<String> tasks = new ConcurrentLinkedQueue<String>();

main() {
    for (int i=0;i<N;i++) {
        new Thread(new Worker()).start();
    }
    startSignal.countDown();
    doneSignal.await();
    System.out.println("done");
}

class Worker implements Runnable {
    public void run() {
        startSignal.await();
            while ((s = tasks.poll()) != null) {
                // do lengthy task here
                if (task failed) {
                    tasks.add(s);
                    return; // assume that task fails badly and have to stop the thread
                }
            }
        doneSignal.countDown();
    }
}

what I wanted to achieve is that if a thread fails when processing a task, it will be added back to the task list to be picked up again by current or any other thread, but with my current approach using CountDownLatch obviously it is not possible to do so because after doneSignal.countDown() is called, the thread assumes it already have finished the task.

What would be the best approach for this scenario? Is using Executor the only way?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

疯狂的代价 2024-12-06 08:02:54

我想说,对于这种情况,这是一个过于复杂(并且容易出错)的解决方案,使用通用的 BlockingQueue(从该阻塞队列进行单线程轮询并将作业移交给 ExecutorService)确实会更简单。

在这种情况下看不出您需要 CountDownLatch 的任何原因,它只是不必要地使您的工作人员变得复杂,它必须了解它正在线程环境中运行,并且还必须在完成时清理所有脏东西。 BlockingQueues 和 ExecutorServices 正是为了让您摆脱这些问题。

I'd say this is an overly complicated (and prone to errors) solution for this case, it would be really simpler to use a common BlockingQueue, a single thread polling from this blocking queue and handing over jobs to an ExecutorService.

Can't see any reason why you would need a CountDownLatch in this case, it just unnecessarily complicates your worker, that has to understand that it is running in a threaded environment and also has to clean up whatever is dirty when it finishes. BlockingQueues and ExecutorServices are there exactly to get you out of these issues.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文