生产者消费者 - 使用 Executors.newFixedThreadPool

发布于 2024-11-29 12:58:00 字数 350 浏览 3 评论 0原文

我对生产者-消费者模式的理解是,它可以使用生产者和消费者之间共享的队列来实现。生产者将工作提交到共享队列,消费者检索并处理它。也可以通过生产者直接提交给消费者来实现(生产者线程直接提交给消费者的执行器服务)。

现在,我一直在研究提供线程池的一些常见实现的 Executors 类。根据规范,方法 newFixedThreadPool“重用在共享无界队列上运行的固定数量的线程”。他们在这里谈论哪个队列?

如果Producer直接向consumer提交任务,是ExecutorService的内部队列包含了Runnables列表吗?

或者它是中间队列,以防生产者提交到共享队列?

可能我错过了整个要点,但有人可以澄清一下吗?

My understanding of a Producer-Consumer pattern is that it could be implemented using a queue shared between the producer and the consumer. Producer submits work to a shared queue, consumer retrieves it and processes it. It could also be implemented by the producer directly submitting to the consumer (Producer threads submitting to Consumer's executor service directly). 

Now, I've been looking at the Executors class that provides some common implementations of thread pools. The method newFixedThreadPool, according to the spec, "reuses a fixed number of threads operating off a shared unbounded queue". Which queue are they talking about here? 

If the Producer directly submits a task to a consumer, is it the internal queue of the ExecutorService that contains the list of Runnables?

Or is it the intermediate queue, in case the producer submits to a shared queue? 

May be I'm missing the whole point, but would someone please clarify?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

知足的幸福 2024-12-06 12:58:00

你是对的,ExecutorService不仅仅是一个线程池,而且它是一个完整的生产者-消费者实现。这个内部队列实际上是一个线程安全的 Runnable 队列(准确地说是 FutureTask),保存着您 submit() 的任务。

池中的所有线程都被阻塞在该队列上,等待任务执行。当您 submit() 一项任务时,只有一个线程会拾取该任务并运行它。当然,submit() 不会等待池中的线程完成处理。

另一方面,如果您提交大量任务(或长时间运行的任务),您可能最终会导致池中的所有线程都被占用,并且一些任务在队列中等待。一旦任何线程完成其任务,它将立即从队列中选择第一个线程。

You are right, ExecutorService is not only a thread pool, but it is a full Producer-Consumer implementation. This internal queue is in fact a thread-safe queue of Runnables (FutureTask to be precise) holding tasks you submit().

All the threads in the pool are blocked on that queue, waiting for tasks to be executed. When you submit() a task, exactly one thread will pick it up and run it. Of course submit() is not waiting for thread in the pool to finish processing.

On the other hand if you submit a huge number of tasks (or long-running ones) you might end-up with all threads in the pool being occupied and some tasks waiting in the queue. Once any thread is done with its task, it will immediately pick the first one from the queue.

旧人九事 2024-12-06 12:58:00
public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  
public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  
゛清羽墨安 2024-12-06 12:58:00

看看这个:
Java 中的生产者-消费者示例 (RabbitMQ) (它是为另一个库,但它是用 Java 编写的,它清楚地演示了这个概念;)
希望有帮助!

PS:实际上,它有几个例子,但你明白了;)

Check this out:
Producer-Consumer example in Java (RabbitMQ) (It's written for another library but it's in Java and it demonstrates clearly the concept ;)
Hope it helps!

P.S.:Actually, it has several examples but you get the idea ;)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文