检查Python多处理中的空队列

发布于 2024-11-30 12:21:13 字数 1059 浏览 0 评论 0原文

我有一个使用 python 包多处理和队列的程序。我的函数之一具有这种结构:

from multiprocessing import Process, Queue
def foo(queue):
   while True:
       try:
           a = queue.get(block = False)
           doAndPrintStuff(a)
       except:
           print "the end"
           break

   if __name__ == "__main__"
     nthreads = 4
     queue = Queue.Queue()
     # put stuff in the queue here 
     for stuff in moreStuff:
         queue.put(stuff)
     procs = [Process(target = foo, args = (queue,)) for i in xrange(nthreads)]
     for p in procs:
       p.start()
     for p in procs:
       p.join()

其想法是,当我尝试从队列中提取数据并且队列为空时,它将引发异常并终止循环。所以我有两个问题:

1)这是一个安全的习语吗?有更好的方法来做到这一点吗?

2)我试图找出当我尝试从空队列中调用 .get() 时引发的确切异常是什么。目前我的程序正在捕获所有异常,当错误在其他地方并且我只收到“结束”消息时,这很糟糕。

我尝试过:

  import Queue
  queue = Queue.Queue()
  [queue.put(x) for x in xrange(10)]
  try: 
       print queue.get(block = False)
  except Queue.Empty:
       print "end"
       break

但我收到了错误,就好像我没有捕获异常一样。捕获的正确异常是什么?

I have a program using python's packages multiprocessing and Queue. One of my functions have this structure:

from multiprocessing import Process, Queue
def foo(queue):
   while True:
       try:
           a = queue.get(block = False)
           doAndPrintStuff(a)
       except:
           print "the end"
           break

   if __name__ == "__main__"
     nthreads = 4
     queue = Queue.Queue()
     # put stuff in the queue here 
     for stuff in moreStuff:
         queue.put(stuff)
     procs = [Process(target = foo, args = (queue,)) for i in xrange(nthreads)]
     for p in procs:
       p.start()
     for p in procs:
       p.join()

the idea is that when I try to extract from the queue and it is empty, it'll raise an exception and terminate the loop. So I have two questions:

1) is this a safe idiom? Are there better ways to do this?

2) I tried to find what is the exact exception that is raised when I try to .get() from an empty queue. Currently my program is catching all exceptions, which sucks when the error is somewhere else and I only get a "the end" message.

I tried:

  import Queue
  queue = Queue.Queue()
  [queue.put(x) for x in xrange(10)]
  try: 
       print queue.get(block = False)
  except Queue.Empty:
       print "end"
       break

but I got the error as if I hadn't caught the exception. What's the correct exception to catch?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

虚拟世界 2024-12-07 12:21:13

例外应该是Queue.Empty。但您确定您遇到了同样的错误吗?在第二个示例中,您还将队列本身从 multiprocessing.Queue 切换到 Queue.Queue,我认为这可能是问题所在。

这可能看起来很奇怪,但您必须使用 multiprocessing.Queue 类,但使用 Queue.Empty 异常(您必须从 Queue 中自行导入该异常) 模块)

The exception should be Queue.Empty. But are you sure you got the same error? In your second example, you also switched the queue itself from multiprocessing.Queue to Queue.Queue, which I think may be the problem.

It might seem strange, but you have to use the multiprocessing.Queue class, but use the Queue.Empty exception (which you have to import yourself from the Queue module)

十级心震 2024-12-07 12:21:13

在刷新放置缓冲区之前,队列似乎是空的,这可能需要一段时间。

我们问题的解决方案是使用 sentinels,或者可能是内置的 task_done() 调用:

task_done()

指示先前排队的任务已完成。由队列消费者线程使用。对于每个用于获取任务的 get() ,后续的
调用task_done()告诉队列任务的处理是
完成。

如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(这意味着收到了一个 task_done() 调用)
每个已被 put() 到队列中的项目)。

如果调用次数多于队列中放置的项目数,则会引发 ValueError。

It appears that the Queue is empty until the put buffers are flushed, which may take a while.

The solution to our problem is to use sentinels, or maybe the built-in task_done() call:

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent
call to task_done() tells the queue that the processing on the task is
complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for
every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

咆哮 2024-12-07 12:21:13

这是一个例子-正如@Steven上面所说,您需要使用标准队列中的queue.Empty异常。文档中的注释(https://docs.python.org/3/library/multiprocessing .html):

注意

多处理使用通常的queue.Empty和queue.Full异常
发出超时信号。它们在多处理中不可用
命名空间,因此您需要从队列导入它们。

基本示例:

 from multiprocessing import Process, Queue, Manager
 import queue

def firstPass(q):
    driver = getDriver()

    while True:
        try:      
            link = q.get_nowait()   
            f(driver, link)
        except queue.Empty:
            logger.info("empty queue")
            driver.close()
            break

Here's an example- As @Steven said above, you need to use the queue.Empty exception from the standard queue. The note from documentation (https://docs.python.org/3/library/multiprocessing.html):

Note

multiprocessing uses the usual queue.Empty and queue.Full exceptions
to signal a timeout. They are not available in the multiprocessing
namespace so you need to import them from queue.

basic example:

 from multiprocessing import Process, Queue, Manager
 import queue

def firstPass(q):
    driver = getDriver()

    while True:
        try:      
            link = q.get_nowait()   
            f(driver, link)
        except queue.Empty:
            logger.info("empty queue")
            driver.close()
            break
秋叶绚丽 2024-12-07 12:21:13

尝试阅读 queue 库文档。您不是在寻找 Queue.empty() 吗?

Try reading the queue library docs. Aren't you looking for Queue.empty()?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文