使用队列的生产者/消费者线程
我想创建某种生产者/消费者
线程应用程序。但我不确定在两者之间实现队列的最佳方法是什么。
所以我有两个想法(这两个想法都可能是完全错误的)。我想知道哪个更好,如果它们都很糟糕那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个 Queue 类,它是一个内部类并且是线程安全的。下面是两个示例,每个示例有 4 个类。
主类-
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
消费者类-
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
生产者类-
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
QueueHandler.enqueue(new Object());
}
}
}
队列类
public class QueueHandler
{
//This Queue class is a thread safe (written in house) class
public static Queue<Object> readQ = new Queue<Object>(100);
public static void enqueue(Object object)
{
//do some stuff
readQ.add(object);
}
public static Object dequeue()
{
//do some stuff
return readQ.get();
}
}
- OR
主类-
public class SomeApp
{
Queue<Object> readQ;
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
readQ = new Queue<Object>(100);
consumer = new Consumer(readQ);
producer = new Producer(readQ);
}
}
消费者类-
public class Consumer implements Runnable
{
Queue<Object> queue;
public Consumer(Queue<Object> readQ)
{
queue = readQ;
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = queue.dequeue();
//do some stuff with the object
}
}
}
生产者类-
public class Producer implements Runnable
{
Queue<Object> queue;
public Producer(Queue<Object> readQ)
{
queue = readQ;
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
queue.enqueue(new Object());
}
}
}
队列类
//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
public QueueHandler(int size)
{
super(size); //All I'm thinking about now is McDonalds.
}
public void enqueue(Object object)
{
//do some stuff
readQ.add();
}
public Object dequeue()
{
//do some stuff
return readQ.get();
}
}
-开始吧!
I'd like to create some sort of Producer/Consumer
threading app. But I'm not sure what the best way to implement a queue between the two.
So I've some up with two ideas (both of which could be entirely wrong). I would like to know which would be better and if they both suck then what would be the best way to implement the queue. It's mainly my implementation of the queue in these examples that I'm concerned about. I'm extending a Queue class that is an in house class and is thread safe. Below are two examples with 4 classes each.
Main class-
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
Consumer class-
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
Producer class-
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
QueueHandler.enqueue(new Object());
}
}
}
Queue class-
public class QueueHandler
{
//This Queue class is a thread safe (written in house) class
public static Queue<Object> readQ = new Queue<Object>(100);
public static void enqueue(Object object)
{
//do some stuff
readQ.add(object);
}
public static Object dequeue()
{
//do some stuff
return readQ.get();
}
}
OR
Main class-
public class SomeApp
{
Queue<Object> readQ;
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
readQ = new Queue<Object>(100);
consumer = new Consumer(readQ);
producer = new Producer(readQ);
}
}
Consumer class-
public class Consumer implements Runnable
{
Queue<Object> queue;
public Consumer(Queue<Object> readQ)
{
queue = readQ;
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = queue.dequeue();
//do some stuff with the object
}
}
}
Producer class-
public class Producer implements Runnable
{
Queue<Object> queue;
public Producer(Queue<Object> readQ)
{
queue = readQ;
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
queue.enqueue(new Object());
}
}
}
Queue class-
//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
public QueueHandler(int size)
{
super(size); //All I'm thinking about now is McDonalds.
}
public void enqueue(Object object)
{
//do some stuff
readQ.add();
}
public Object dequeue()
{
//do some stuff
return readQ.get();
}
}
And go!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(8)
Java 5+ 拥有完成此类任务所需的所有工具。您将需要:
ExecutorService
;阻塞队列
。我对(3)说“如果有必要”,因为根据我的经验,这是一个不必要的步骤。您所做的就是向消费者执行器服务提交新任务。所以:
所以
生产者
直接提交给消费者
。Java 5+ has all the tools you need for this kind of thing. You will want to:
ExecutorService
;ExecutorService
;BlockingQueue
.I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:
So the
producers
submit directly toconsumers
.好的,正如其他人指出的,最好的办法是使用 java.util.concurrent 包。我强烈推荐《Java并发实践》。这是一本很棒的书,几乎涵盖了您需要了解的所有内容。
至于您的特定实现,正如我在评论中指出的那样,不要从构造函数启动线程 - 它可能是不安全的。
撇开这一点不谈,第二种实现似乎更好。您不想将队列放入静态字段中。您可能只是白白失去了灵活性。
如果您想继续自己的实现(我猜是出于学习目的?),请至少提供一个
start()
方法。您应该构造该对象(您可以实例化Thread
对象),然后调用start()
来启动线程。编辑:ExecutorService 有自己的队列,因此这可能会令人困惑。这里有一些可以帮助您入门的内容。
进一步编辑:
对于生产者,您可以执行以下操作而不是
while(true)
:这样您就可以通过调用
.shutdownNow()
来关闭执行器。如果您使用while(true)
,它不会关闭。另请注意,
Producer
仍然容易受到RuntimeExceptions
的影响(即一个RuntimeException
将停止处理)OK, as others note, the best thing to do is to use
java.util.concurrent
package. I highly recommend "Java Concurrency in Practice". It's a great book that covers almost everything you need to know.As for your particular implementation, as I noted in the comments, don't start Threads from Constructors -- it can be unsafe.
Leaving that aside, the second implementation seem better. You don't want to put queues in static fields. You are probably just loosing flexibility for nothing.
If you want to go ahead with your own implementation (for learning purpose I guess?), supply a
start()
method at least. You should construct the object (you can instantiate theThread
object), and then callstart()
to start the thread.Edit:
ExecutorService
have their own queue so this can be confusing.. Here's something to get you started.Further EDIT:
For producer, instead of
while(true)
, you can do something like:This way you can shutdown the executor by calling
.shutdownNow()
. If you'd usewhile(true)
, it won't shutdown.Also note that the
Producer
is still vulnerable toRuntimeExceptions
(i.e. oneRuntimeException
will halt the processing)我已经扩展了 cletus 对工作代码示例的建议答案。
ExecutorService
(ces) 接受Consumer
任务。Producer
和Consumer
共享BlockingQueue
。Consumer
任务都可以消耗Producer
生成的数字代码:
输出:
注意。如果不需要多个生产者和消费者,请保留单个生产者和消费者。我添加了多个生产者和消费者,以在多个生产者和消费者之间展示 BlockingQueue 的功能。
I have extended cletus proposed answer to working code example.
ExecutorService
(pes) acceptsProducer
tasks.ExecutorService
(ces) acceptsConsumer
tasks.Producer
andConsumer
sharesBlockingQueue
.Producer
tasks generates different numbers.Consumer
tasks can consume number generated byProducer
Code:
output:
Note. If you don't need multiple Producers and Consumers, keep single Producer and Consumer. I have added multiple Producers and Consumers to showcase capabilities of BlockingQueue among multiple Producers and Consumers.
您正在重新发明轮子。
如果您需要持久性和其他企业功能,请使用 JMS (我建议ActiveMq)。
如果您需要快速内存队列,请使用 java 的 队列。
如果您需要支持 java 1.4 或更早版本,请使用 Doug Lea 的优秀 并发包。
You are reinventing the wheel.
If you need persistence and other enterprise features use JMS (I'd suggest ActiveMq).
If you need fast in-memory queues use one of the impementations of java's Queue.
If you need to support java 1.4 or earlier, use Doug Lea's excellent concurrent package.
这是一个非常简单的代码。
This is a very simple code.
BlockingQueue.java
Consumer.java
ProducerConsumer_Main.java
BlockingQueue.java
Consumer.java
ProducerConsumer_Main.java
将这种类型安全模式与毒丸一起使用:
Use this typesafe pattern with poison pills:
当
它是静态时它可以工作,但是当它是非静态时它不能正常工作。
如何修复它?
When
it is static it works, but when it is non-static it doesn't work properly.
How to fix it?