生产者/消费者工作队列

发布于 2024-08-21 01:06:12 字数 789 浏览 13 评论 0原文

我正在努力寻找实现处理管道的最佳方法。

我的生产者将工作提供给 BlockingQueue。在消费者方面,我轮询队列,将获得的内容包装在可运行任务中,然后将其提交给 ExecutorService。

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

这并不理想;当然,ExecutorService 有自己的队列,所以真正发生的情况是我总是完全排空工作​​队列并填充任务队列,随着任务完成,任务队列会慢慢清空。

我意识到我可以在生产者端对任务进行排队,但我真的不想这样做 - 我喜欢工作队列的间接/隔离是哑字符串;他们会发生什么真的不关制片人的事。恕我直言,强制生产者将 Runnable 或 Callable 排队会破坏抽象。

但我确实希望共享工作队列代表当前的处理状态。如果消费者不跟上,我希望能够阻止生产者。

我很想使用 Executors,但我觉得我正在与他们的设计作斗争。我可以喝部分 Kool-ade 饮料吗,还是必须整个喝下去?我拒绝排队任务是不是脑子有问题? (我怀疑我可以将 ThreadPoolExecutor 设置为使用 1 任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很恶心。)

建议?

I'm wrestling with the best way to implement my processing pipeline.

My producers feed work to a BlockingQueue. On the consumer side, I poll the queue, wrap what I get in a Runnable task, and submit it to an ExecutorService.

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

This is not ideal; the ExecutorService has its own queue, of course, so what's really happening is that I'm always fully draining my work queue and filling the task queue, which slowly empties as the tasks complete.

I realize that I could queue tasks at the producer end, but I'd really rather not do that - I like the indirection/isolation of my work queue being dumb strings; it really isn't any business of the producer what's going to happen to them. Forcing the producer to queue a Runnable or Callable breaks an abstraction, IMHO.

But I do want the shared work queue to represent the current processing state. I want to be able to block the producers if the consumers aren't keeping up.

I'd love to use Executors, but I feel like I'm fighting their design. Can I partially drink the Kool-ade, or do I have to gulp it? Am I being wrong-headed in resisting queueing tasks? (I suspect I could set up ThreadPoolExecutor to use a 1-task queue and override it's execute method to block rather than reject-on-queue-full, but that feels gross.)

Suggestions?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

丢了幸福的猪 2024-08-28 01:06:12

我希望共享工作队列
代表当前处理
状态。

尝试使用共享的 BlockingQueue 并有从队列中取出工作项的工作线程池。

我希望能够阻止
如果消费者不是生产者
跟上。

ArrayBlockingQueueLinkedBlockingQueue 支持有界队列,这样它们就会在放置时阻塞当满的时候。使用阻塞 put( ) 方法确保生产者在队列已满时被阻塞。

这是一个粗略的开始。您可以调整工作人员数量和队列大小:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

I want the shared work queue to
represent the current processing
state.

Try using a shared BlockingQueue and have a pool of Worker threads taking work items off of the Queue.

I want to be able to block the
producers if the consumers aren't
keeping up.

Both ArrayBlockingQueue and LinkedBlockingQueue support bounded queues such that they will block on put when full. Using the blocking put() methods ensures that producers are blocked if the queue is full.

Here is a rough start. You can tune the number of workers and queue size:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
梦里寻她 2024-08-28 01:06:12

“查找可用的现有工作线程(如果存在),如果需要则创建一个,如果它们闲置则杀死它们。”

管理所有这些工人状态既不必要又危险。我会创建一个在后台不断运行的监视器线程,它的唯一任务是填充队列并生成消费者...为什么不让工作线程守护进程,这样它们一完成就死掉?如果将它们全部附加到一个 ThreadGroup,您可以动态调整池的大小...例如:

  **for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
         spawnDaemonWorkers(queue.poll());
  }**

"find an available existing worker thread if one exists, create one if necessary, kill them if they go idle."

Managing all those worker states is as unnecessary as it is perilous. I would create one monitor thread that constantly runs in the background, who's only task is to fill up the queue and spawn consumers... why not make the worker threads daemons so they die as soon as they complete? If you attach them all to one ThreadGroup you can dynamically re-size the pool... for example:

  **for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
         spawnDaemonWorkers(queue.poll());
  }**
不寐倦长更 2024-08-28 01:06:12

您可以让您的消费者直接执行 Runnable::run,而不是启动新线程。将此与具有最大大小的阻塞队列结合起来,我认为您会得到您想要的。您的消费者成为根据队列上的工作项内联执行任务的工作人员。他们只会以处理项目的速度将项目出队,因此当您的消费者停止消费时,您的生产者就会这样做。

You could have your consumer execute Runnable::run directly instead of starting a new thread up. Combine this with a blocking queue with a maximum size and I think that you will get what you want. Your consumer becomes a worker that is executing tasks inline based on the work items on the queue. They will only dequeue items as fast as they process them so your producer when your consumers stop consuming.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文