多处理队列最大大小限制为 32767
我正在尝试使用多处理编写一个 Python 2.6 (OSX) 程序,并且我想用超过默认值 32767 的项目填充队列。
from multiprocessing import Queue
Queue(2**15) # raises OSError
Queue(32767)
工作正常,但任何更高的数字(例如 Queue(32768)
)都会失败,并出现 OSError: [Errno 22] Invalid argument
Is这个问题有解决方法吗?
I'm trying to write a Python 2.6 (OSX) program using multiprocessing, and I want to populate a Queue with more than the default of 32767 items.
from multiprocessing import Queue
Queue(2**15) # raises OSError
Queue(32767)
works fine, but any higher number (e.g. Queue(32768)
) fails with OSError: [Errno 22] Invalid argument
Is there a workaround for this issue?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
一种方法是使用自定义类包装您的
multiprocessing.Queue
(仅在生产者端,或者从消费者的角度透明地)。使用它,您可以将要分派到您正在包装的Queue
对象的项目排队,并且仅将本地队列(Pythonlist()
对象)中的内容提供到当空间可用时,multiprocess.Queue
会执行此操作,当Queue
已满时,会进行异常处理以进行限制。这可能是最简单的方法,因为它对其余代码的影响应该最小。自定义类的行为应该像队列一样,同时将底层的
multiprocessing.Queue
隐藏在抽象后面。(一种方法可能是让您的生产者使用线程,一个线程来管理从线程
Queue
到您的multiprocessing.Queue
的分派,而任何其他线程实际上只是为线程提供数据队列)。One approach would be to wrap your
multiprocessing.Queue
with a custom class (just on the producer side, or transparently from the consumer perspective). Using that you would queue up items to be dispatched to theQueue
object that you're wrapping, and only feed things from the local queue (Pythonlist()
object) into themultiprocess.Queue
as space becomes available, with exception handling to throttle when theQueue
is full.That's probably the easiest approach since it should have the minimum impact on the rest of your code. The custom class should behave just like a Queue while hiding the underlying
multiprocessing.Queue
behind your abstraction.(One approach might be to have your producer use threads, one thread to manage the dispatch from a threading
Queue
to yourmultiprocessing.Queue
and any other threads actually just feeding the threadingQueue
).我已经回答了原来的问题,但我确实想补充一下 Redis 列表非常可靠并且Python模块的支持因为它们非常容易用于实现类似队列的对象。它们的优点是允许在多个节点(跨网络)以及多个进程上进行横向扩展。
基本上,要使用这些,您只需为队列名称选择一个键(字符串),让生产者将其推入其中,并让您的工作人员(任务使用者)循环阻止来自该键的弹出操作。
Redis BLPOP 和 BRPOP 命令都采用键列表(列表/队列)和可选的超时值。它们返回一个元组(键,值)或无(超时)。因此,您可以轻松地编写一个与熟悉的 select() 结构非常相似的事件驱动系统(但级别要高得多)。您唯一需要注意的是丢失的键和无效的键类型(当然,只需用异常处理程序包装您的队列操作)。 (如果某些其他应用程序在您的共享 Redis 服务器上停止,删除键或将您用作队列的键替换为字符串/整数或其他类型的值……好吧,此时您会遇到不同的问题)。 :)
这种模型的另一个优点是 Redis 确实将其数据保存到磁盘上。因此,如果您选择允许,您的工作队列可以在系统重新启动后继续存在。
(当然,如果您确实想这样做,您可以在 SQLlite 或任何其他 SQL 系统中实现一个简单的队列作为表;只需使用某种自动递增索引进行排序,并使用一列来标记每个项目已被“完成”(已消耗);但这确实比使用 Redis 为您提供的“开箱即用”更复杂)。
I've already answered the original question but I do feel like adding that Redis lists are quite reliable and the Python module's support for them are extremely easy to use for implementing a Queue like object. These have the advantage of allowing one to scale out over multiple nodes (across a network) as well as just over multiple processes.
Basically to use those you'd just pick a key (string) for your queue name have your producers push into it and have your workers (task consumers) loop on blocking pops from that key.
The Redis BLPOP, and BRPOP commands all take a list of keys (lists/queues) and an optional timeout value. They return a tuple (key,value) or None (on timeout). So you can easily write up an event driven system that's very similar to the familiar structure of select() (but at a much higher level). The only thing you have to watch for are missing keys and invalid key types (just wrap your queue operations with exception handlers, of course). (If some other application stops on your shared Redis server removing keys or replacing keys that you were using as queues with string/integer or other types of values ... well, you have a different problem at that point). :)
Another advantage of this model is that Redis does persist its data to the disk. So your work queue could survive system restarts if you chose to allow it.
(Of course you could implement a simple Queue as a table in SQLlite or any other SQL system if you really wanted to do so; just using some sort of auto-incrementing index for the sequencing and a column to mark each item has having been "done" (consumed); but that does involve somewhat more complexity than using what Redis gives you "out of the box").