将 python 队列保存到文件中
我正在使用 Python Queue cclass 来管理在多个工作线程之间共享的任务列表。实际的代码非常庞大,我仍在努力使其完全没有错误。有时,工作线程会崩溃,我必须重新启动整个例程。在此过程中,我丢失了所有已排队的任务。有没有办法将队列保存到文件中,以便每当我重新启动进程时,任务列表都会从该文件中预加载?
乍一看,当我将任务放入队列中时,我应该同时读取和写入文件。然而,这并没有给我queue.task_done()的功能,并且可能不是最优化的解决方案。任何想法将不胜感激。
I am using the Python Queue cclass to manage a list of tasks that are shared between multiple worker threads. The actual code is humungous and I'm still in the process of making it entirely bug free. Time to time, the worker threads will crash and I have to restart the whole routine. In the process I lose all the tasks that have been queued. Is there a way to save the queue to a file so that whenever I restart the process the task list is preloaded from that file?
On first thought it seems that as I get or put tasks into the queue, I should be reading and writing to a file simultaneously. However this doesn't give me the functionality of queue.task_done() and may not be the most optimized solution. Any ideas would be greatly appreciated.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
您是否考虑过酸洗您的队列?
Have you considered simply pickling your queue?
有多种方法可以实现此目的,包括 pickle 模块...
但在我看来,只需逐行写入文件,队列中的每个元素包含其他属性的列就很简单您可能想要保存,例如
task_done
。示例:
在 python 中,读取这样格式的文件非常容易,有点像:
There are multiple approach to this, including the
pickle
module...But in my opinion it would be simples to just write to a file, line per line, each element of the queue in collumns containing other properties you may want to save, like the
task_done
.example:
In python is super-easy to read a file formatted like this, kinda like:
执行此操作的简单方法是对消息队列使用 AMQP,并让消息代理为您处理消息。我使用 RabbitMQ 作为具有持久持久队列的消息代理实现了一个类似的系统。当我在虚拟 Linux 服务器上使用过时的 1.72 服务器版本时,这些消息甚至在 RabbitMQ 服务器软件崩溃时幸存下来,该服务器只有 512M RAM 和大约一百万条正在运行的消息。
我这样做的方法是每种类型的工作人员消耗来自不同队列的消息。如果我需要多个该类型的工作人员,则消息队列会自动循环,并且如果工作人员无法完成消息的处理,他们只是不确认该消息并将其返回到队列中。
我编写了一个大约 80 行代码的小垫片模块,放在 kombu 前面,后来重写了它以使用 py-amqplib 。如果我早点知道
haigha
我就会使用它,因为它与 AMQP 规范文档非常匹配。我不推荐 kombu,因为它的调试非常复杂,并且以奇怪的方式偏离 AMQP 标准。看一下
haigha
,因为尽管该文档不过是 PyPi 上的一个示例代码片段,但它比 kombu 或 amqplib 的记录更好,因为您可以使用 AMQP 规范作为 haigha 文档。The easy way to do this is to use AMQP for the message queues and let the message broker take care of the messages for you. I implemented a similar system using RabbitMQ as the message broker with durable persistent queues. The messages have even survived a crash of the RabbitMQ server software when I was using an outdated 1.72 server version on a virtual Linux server with only 512M of RAM and a million or so messages in play.
The way I do it is that each type of worker consumes messages from a different queue. If I need more than one worker of that type, then the message queue is automatically round robin, and if a worker cannot complete processing a message, they just don't ack it and it goes back on the queue.
I wrote a little shim module with about 80 lines of code to sit in front of
kombu
, and later rewrote that to usepy-amqplib
. If I had known abouthaigha
earlier I would have used that since it matches very closely to the AMQP specifications document.I do not recommend kombu because it is so complex for debugging and diverges from the AMQP standard in weird ways. Have a look at
haigha
because even though the documentation is no more than one example code fragment on PyPi, it is better documented than either kombu or amqplib because you can use the AMQP specs as your haigha docs.我可以提供的一个简单选项是将数据库表包装在一个类中并将其用作队列。自动增量列对此会产生奇迹(下一个要删除的项目是 ID 最低的项目)。
这不会有最好的性能,具体取决于队列更新的频率,即使是内存中的 sqlite 数据库也不会像链表结构那么快。另一方面是您可以使用任何可以访问数据库的工具查看数据库,这样您就可以看到哪个数据库正在进行中。
A simple option I can offer would be to wrap a database table in a class and use that as your queue. A auto-increment column would work wonders for this (the next item to remove is the one with the lowest ID).
This won't have the best performance depending on how often the queue gets updated, even the in-memory sqlite database won't be as fast as a linked list structure. The flip side is that you can view the database using any tool that can access the database, so you can see which one is in progress.
在worker和master之间实现握手机制。
主节点有一个任务列表,在将它们放入队列之前,将列表腌制到文件中。
然后将任务插入队列。当工作人员完成后,他会发回一条 ACK 消息。
仅在此时取消任务列表并删除相应的 id。
Implement a handshaking mechanism between the worker and the master.
The master has a list of tasks, before putting them inside a Queue, pickle the list to a file.
Then insert the task into the Queue. When the worker is done, he sends back an ACK message.
Only at that point unpickle the task list and delete the corresponding id.