生产者-消费者线程间通信
在线程间通信方面遇到问题,并通过到处使用“虚拟消息”来“解决”它。这是一个坏主意吗?可能的解决方案有哪些?
我遇到的示例问题。
主线程启动一个线程来处理记录并将其插入数据库。 主线程读取一个可能很大的文件,并将一个又一个记录(对象)放入阻塞队列中。处理线程从队列中读取并执行工作。
如何告诉“处理线程”停止? 队列可以为空,但工作尚未完成,并且当处理线程已完成工作并且无法中断它时,主线程现在也不会完成。
所以处理线程
while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) {
MyObject object= queue.poll(100, TimeUnit.MILLISECONDS);
if (object != null) {
String data = object.getData();
if (data.equals("END")) {
break;
}
// do work
}
}
// clean-up
synchronized queue) {
queue.notifyAll();
}
return;
和主线程
// ...start processing thread...
while(reader.hasNext(){
// ...read whole file and put data in queue...
}
MyObject dummy = new MyObject();
dummy.setData("END");
queue.put(dummy);
//Note: empty queue here means work is done
while (queue.size() > 0) {
synchronized (queue) {
queue.wait(500); // over-cautios locking prevention i guess
}
}
注意插入必须在同一个事务中并且事务不能被处理 通过主线程。
这样做更好的方法是什么? (我正在学习,不想开始“以错误的方式做事”)
having trouble with inter-thread communication and "solved" it by using "dummy messages" all over the place. Is this a bad idea? What are possible solutions?
Example Problem i have.
main thread starts a thread for processing and inserting records into database.
main thread reads a possibly huge file and puts one record (object) after another into a blockingqueue. processing thread reads from queue and does work.
How do I tell "processing thread" to stop?
Queue can be empty but work is not done and the main thread does not now either when processing thread has finished work and can't interrupt it.
So processing thread does
while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) {
MyObject object= queue.poll(100, TimeUnit.MILLISECONDS);
if (object != null) {
String data = object.getData();
if (data.equals("END")) {
break;
}
// do work
}
}
// clean-up
synchronized queue) {
queue.notifyAll();
}
return;
and main thread
// ...start processing thread...
while(reader.hasNext(){
// ...read whole file and put data in queue...
}
MyObject dummy = new MyObject();
dummy.setData("END");
queue.put(dummy);
//Note: empty queue here means work is done
while (queue.size() > 0) {
synchronized (queue) {
queue.wait(500); // over-cautios locking prevention i guess
}
}
Note that insertion must be in same transaction and transaction can't be handled
by main thread.
What would be a better way of doing this?
(I'm learning and don't want to start "doing it the wrong way")
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
这些虚拟消息是有效的。谓之“毒”。生产者发送给消费者以使其停止的东西。
另一种可能性是在主线程中的某个位置调用 Thread.interrupt() ,并在工作线程中相应地捕获并处理 InterruptedException 。
These dummy message is valid. It is called "poison". Something that the producer sends to the consumer to make it stop.
Other possibility is to call Thread.interrupt() somewhere in the main thread and catch and handle the InterruptedException accordingly, in the worker thread.
这不是一个坏主意,它被称为“毒丸”,是停止基于线程的服务的合理方法。
但只有当生产者和消费者的数量已知时,它才有效。
在您发布的代码中,有两个线程,一个是“主线程”,用于生成数据,另一个是“处理线程”,用于消耗数据,“毒丸”非常适合这种情况。
但想象一下,如果你还有其他生产者,消费者怎么知道什么时候停止(只有当所有生产者都发送“毒丸”时),你需要确切地知道所有生产者的数量,并检查“毒丸”的数量消费者中的“药片”,如果等于生产者的数量,则意味着所有生产者都停止工作,那么消费者也停止工作。
在“主线程”中,您需要捕获
InterruptedException
,因为如果没有,“主线程”可能无法设置“毒丸”。您可以像下面这样做,另外,您可以尝试使用
ExecutorService
来解决您的所有问题。(当你只需要做一些工作,然后在所有工作完成后停止时,它会起作用)
您可能需要阅读这本书:Java Concurrency in Practice。相信我,这是最好的。
It's not a bad idea, it's called "Poison Pills" and is a reasonable way to stop a thread-based service.
But it only works when the number of producers and consumers is known.
In code you posted, there are two threads, one is "main thread", which produces data, the other is "processing thread", which consumes data, the "Poison Pills" works well for this circumstance.
But to imagine, if you also have other producers, how does consumer know when to stop (only when all producers send "Poison Pills"), you need to know exactly the number of all the producers, and to check the number of "Poison Pills" in consumer, if it equals to the number of producers, which means all producers stopped working, then consumer stops.
In "main thread", you need to catch the
InterruptedException
, since if not, "main thread" might not able to set the "Poison Pill". You can do it like below,Also, you can try to use the
ExecutorService
to solve all your problem.(It works when you just need to do some works and then stop when all are finished)
You might need to read the Book: Java Concurrency in Practice. Trust me, it's the best.
您可以做的(我在最近的项目中所做的)是包装队列,然后添加一个
'isOpen()'
方法。因此,您的编写者线程变得类似于 :
和读者线程:
您当然可以扩展列表,但这会给用户提供您可能不想要的访问级别。您需要向此代码添加一些并发事物,例如 AtomicBoolean。
What you could do (which I did in a recent project) is to wrap the queue and then add a
'isOpen()'
method.So your writer thread becomes something like :
and the reader thread:
You could of course extend the list instead but that gives the user a level of access you might not want. And you need to add some concurrency thingies to this code, like AtomicBoolean.