Python 中的多处理队列

发布于 2024-11-24 02:47:07 字数 798 浏览 2 评论 0原文

我正在尝试在 Python 中使用带有多处理库的队列。执行下面的代码后(打印语句起作用),但是在我调用队列上的 join 之后进程不会退出并且仍然存在。如何终止剩余进程?

谢谢!

def MultiprocessTest(self):
  print "Starting multiprocess."
  print "Number of CPUs",multiprocessing.cpu_count()

  num_procs = 4
  def do_work(message):
    print "work",message ,"completed"

  def worker():
    while True:
      item = q.get()
      do_work(item)
      q.task_done()

  q = multiprocessing.JoinableQueue()
  for i in range(num_procs):
    p = multiprocessing.Process(target=worker)
    p.daemon = True
    p.start()

  source = ['hi','there','how','are','you','doing']
  for item in source:
    q.put(item)
  print "q close"
  q.join()
  #q.close()
  print "Finished everything...."
  print "num active children:",multiprocessing.active_children()

I'm trying to use a queue with the multiprocessing library in Python. After executing the code below (the print statements work), but the processes do not quit after I call join on the Queue and there are still alive. How can I terminate the remaining processes?

Thanks!

def MultiprocessTest(self):
  print "Starting multiprocess."
  print "Number of CPUs",multiprocessing.cpu_count()

  num_procs = 4
  def do_work(message):
    print "work",message ,"completed"

  def worker():
    while True:
      item = q.get()
      do_work(item)
      q.task_done()

  q = multiprocessing.JoinableQueue()
  for i in range(num_procs):
    p = multiprocessing.Process(target=worker)
    p.daemon = True
    p.start()

  source = ['hi','there','how','are','you','doing']
  for item in source:
    q.put(item)
  print "q close"
  q.join()
  #q.close()
  print "Finished everything...."
  print "num active children:",multiprocessing.active_children()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

苹果你个爱泡泡 2024-12-01 02:47:07

试试这个:

import multiprocessing

num_procs = 4
def do_work(message):
  print "work",message ,"completed"

def worker():
  for item in iter( q.get, None ):
    do_work(item)
    q.task_done()
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  procs.append( multiprocessing.Process(target=worker) )
  procs[-1].daemon = True
  procs[-1].start()

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

q.join()

for p in procs:
  q.put( None )

q.join()

for p in procs:
  p.join()

print "Finished everything...."
print "num active children:", multiprocessing.active_children()

try this:

import multiprocessing

num_procs = 4
def do_work(message):
  print "work",message ,"completed"

def worker():
  for item in iter( q.get, None ):
    do_work(item)
    q.task_done()
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  procs.append( multiprocessing.Process(target=worker) )
  procs[-1].daemon = True
  procs[-1].start()

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

q.join()

for p in procs:
  q.put( None )

q.join()

for p in procs:
  p.join()

print "Finished everything...."
print "num active children:", multiprocessing.active_children()
七颜 2024-12-01 02:47:07

您的工作人员需要一个哨兵来终止,否则他们将只是坐在阻塞读取上。请注意,在 Q 上使用 sleep 而不是 P 上的 join 可以让您显示状态信息等。
我首选的模板是:

def worker(q,nameStr):
  print 'Worker %s started' %nameStr
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful
     q.task_done()
  print 'Worker %s Finished' % nameStr
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  nameStr = 'Worker_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  p.daemon = True
  p.start()
  procs.append(p)

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

for i in range(num_procs):
  q.put(None) # send termination sentinel, one for each process

while not q.empty(): # wait for processing to finish
  sleep(1)   # manage timeouts and status updates etc.

Your workers need a sentinel to terminate, or they will just sit on the blocking reads. Note that using sleep on the Q instead of join on the P lets you display status information etc.
My preferred template is:

def worker(q,nameStr):
  print 'Worker %s started' %nameStr
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful
     q.task_done()
  print 'Worker %s Finished' % nameStr
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  nameStr = 'Worker_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  p.daemon = True
  p.start()
  procs.append(p)

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

for i in range(num_procs):
  q.put(None) # send termination sentinel, one for each process

while not q.empty(): # wait for processing to finish
  sleep(1)   # manage timeouts and status updates etc.
浮生未歇 2024-12-01 02:47:07

这里有一个无哨兵方法,适用于相对简单的情况,您将多个任务放在JoinableQueue上,然后启动消耗任务的工作进程,并在读取后退出队列“干”。诀窍是使用 JoinableQueue.get_nowait() 而不是 get()get_nowait(),顾名思义,尝试以非阻塞方式从队列中获取值,如果没有任何内容可获取,则抛出 queue.Empty 异常被提出。工作线程通过退出来处理该异常。

简单的代码来说明原理:

import multiprocessing as mp
from queue import Empty

def worker(q):
  while True:
    try:
      work = q.get_nowait()
      # ... do something with `work`
      q.task_done()
    except Empty:
      break # completely done

# main
worknum = 4
jq = mp.JoinableQueue()

# fill up the task queue
# let's assume `tasks` contains some sort of data
# that your workers know how to process
for task in tasks:
  jq.put(task)

procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
for p in procs:
  p.start()

for p in procs:
  p.join()

优点是不需要在队列中放入“毒丸”,因此代码短一些。

重要:在更复杂的情况下,生产者和消费者以“交错”方式使用同一个队列,并且工作人员可能必须等待新任务的出现,“应采用“毒丸”的方法。我上面的建议是针对简单的情况,工作人员“知道”如果任务队列是空的,那么就没有必要再犹豫了。

Here is a sentinel-free method for the relatively simple case where you put a number of tasks on a JoinableQueue, then launch worker processes that consume the tasks and exit once they read the queue "dry". The trick is to use JoinableQueue.get_nowait() instead of get(). get_nowait(), as the name implies, tries to get a value from the queue in a non-blocking manner and if there's nothing to be gotten then a queue.Empty exception is raised. The worker handles this exception by exiting.

Rudimentary code to illustrate the principle:

import multiprocessing as mp
from queue import Empty

def worker(q):
  while True:
    try:
      work = q.get_nowait()
      # ... do something with `work`
      q.task_done()
    except Empty:
      break # completely done

# main
worknum = 4
jq = mp.JoinableQueue()

# fill up the task queue
# let's assume `tasks` contains some sort of data
# that your workers know how to process
for task in tasks:
  jq.put(task)

procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
for p in procs:
  p.start()

for p in procs:
  p.join()

The advantage is that you do not need to put the "poison pills" on the queue so the code is a bit shorter.

IMPORTANT : in more complex situations where producers and consumers use the same queue in an "interleaved" manner and the workers may have to wait for new tasks to come along, the "poison pill" approach should be used. My suggestion above is for simple cases where the workers "know" that if the task queue is empty, then there's no point hanging around any more.

情独悲 2024-12-01 02:47:07

在加入进程之前必须清除队列,但 q.empty() 不可靠。

清除队列的最好方法是统计成功获取或循环的次数,直到收到哨兵值,就像具有可靠网络的套接字一样。

You have to clear the queue before joining the process, but q.empty() is unreliable.

The best way to clear the queue is to count the number of successful gets or loop until you receive a sentinel value, just like a socket with a reliable network.

予囚 2024-12-01 02:47:07

下面的代码可能不是很相关,但我将其发布以供您发表评论/反馈,以便我们可以共同学习。谢谢你!

import multiprocessing

def boss(q,nameStr):
  source = range(1024)
  for item in source:
    q.put(nameStr+' '+str(item))
  q.put(None) # send termination sentinel, one for each process

def worker(q,nameStr):
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful

q = multiprocessing.Queue()

procs = []

num_procs = 4
for i in range(num_procs):
  nameStr = 'ID_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  procs.append(p)
  p = multiprocessing.Process(target=boss,   args=(q,nameStr))
  procs.append(p)

for j in procs:
  j.start()
for j in procs:
  j.join()

The code below may not be very relevant but I post it for your comments/feedbacks so we can learn together. Thank you!

import multiprocessing

def boss(q,nameStr):
  source = range(1024)
  for item in source:
    q.put(nameStr+' '+str(item))
  q.put(None) # send termination sentinel, one for each process

def worker(q,nameStr):
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful

q = multiprocessing.Queue()

procs = []

num_procs = 4
for i in range(num_procs):
  nameStr = 'ID_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  procs.append(p)
  p = multiprocessing.Process(target=boss,   args=(q,nameStr))
  procs.append(p)

for j in procs:
  j.start()
for j in procs:
  j.join()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文