Python 中的多处理队列
我正在尝试在 Python 中使用带有多处理库的队列。执行下面的代码后(打印语句起作用),但是在我调用队列上的 join 之后进程不会退出并且仍然存在。如何终止剩余进程?
谢谢!
def MultiprocessTest(self):
print "Starting multiprocess."
print "Number of CPUs",multiprocessing.cpu_count()
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = multiprocessing.JoinableQueue()
for i in range(num_procs):
p = multiprocessing.Process(target=worker)
p.daemon = True
p.start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
print "q close"
q.join()
#q.close()
print "Finished everything...."
print "num active children:",multiprocessing.active_children()
I'm trying to use a queue with the multiprocessing library in Python. After executing the code below (the print statements work), but the processes do not quit after I call join on the Queue and there are still alive. How can I terminate the remaining processes?
Thanks!
def MultiprocessTest(self):
print "Starting multiprocess."
print "Number of CPUs",multiprocessing.cpu_count()
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = multiprocessing.JoinableQueue()
for i in range(num_procs):
p = multiprocessing.Process(target=worker)
p.daemon = True
p.start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
print "q close"
q.join()
#q.close()
print "Finished everything...."
print "num active children:",multiprocessing.active_children()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
试试这个:
try this:
您的工作人员需要一个哨兵来终止,否则他们将只是坐在阻塞读取上。请注意,在 Q 上使用 sleep 而不是 P 上的 join 可以让您显示状态信息等。
我首选的模板是:
Your workers need a sentinel to terminate, or they will just sit on the blocking reads. Note that using sleep on the Q instead of join on the P lets you display status information etc.
My preferred template is:
这里有一个无哨兵方法,适用于相对简单的情况,您将多个任务放在
JoinableQueue
上,然后启动消耗任务的工作进程,并在读取后退出队列“干”。诀窍是使用JoinableQueue.get_nowait()
而不是get()
。get_nowait()
,顾名思义,尝试以非阻塞方式从队列中获取值,如果没有任何内容可获取,则抛出queue.Empty
异常被提出。工作线程通过退出来处理该异常。简单的代码来说明原理:
优点是不需要在队列中放入“毒丸”,因此代码短一些。
重要:在更复杂的情况下,生产者和消费者以“交错”方式使用同一个队列,并且工作人员可能必须等待新任务的出现,“应采用“毒丸”的方法。我上面的建议是针对简单的情况,工作人员“知道”如果任务队列是空的,那么就没有必要再犹豫了。
Here is a sentinel-free method for the relatively simple case where you put a number of tasks on a
JoinableQueue
, then launch worker processes that consume the tasks and exit once they read the queue "dry". The trick is to useJoinableQueue.get_nowait()
instead ofget()
.get_nowait()
, as the name implies, tries to get a value from the queue in a non-blocking manner and if there's nothing to be gotten then aqueue.Empty
exception is raised. The worker handles this exception by exiting.Rudimentary code to illustrate the principle:
The advantage is that you do not need to put the "poison pills" on the queue so the code is a bit shorter.
IMPORTANT : in more complex situations where producers and consumers use the same queue in an "interleaved" manner and the workers may have to wait for new tasks to come along, the "poison pill" approach should be used. My suggestion above is for simple cases where the workers "know" that if the task queue is empty, then there's no point hanging around any more.
在加入进程之前必须清除队列,但 q.empty() 不可靠。
清除队列的最好方法是统计成功获取或循环的次数,直到收到哨兵值,就像具有可靠网络的套接字一样。
You have to clear the queue before joining the process, but q.empty() is unreliable.
The best way to clear the queue is to count the number of successful gets or loop until you receive a sentinel value, just like a socket with a reliable network.
下面的代码可能不是很相关,但我将其发布以供您发表评论/反馈,以便我们可以共同学习。谢谢你!
The code below may not be very relevant but I post it for your comments/feedbacks so we can learn together. Thank you!