Celery 的类似信号量的机制
我们正在使用 Python + Celery 为我们的任务队列开发一个分布式应用程序。
我们的应用程序要求我们通过 IMAP(例如:gmail)从远程 ISP 下载电子邮件,并且我们希望能够并行完成此任务。对于给定的电子邮件帐户,您被授予有限数量的模拟连接,因此我们需要一种方法来自动跟踪所有正在下载的帐户的活动连接。
我发现了多个使用 Redis 的 Celery 原子锁示例,但没有一个可以像这样跟踪有限资源池,并且所有实现我们自己的尝试都导致了难以调试竞争条件,导致我们的锁间歇性地永远不会被释放。
We're developing a distributed application in Python + Celery for our task queue.
Our application requires us to download emails from a remote ISP via IMAP (e.g.: gmail) and we're looking to have be able this task be done in parallel. For a given email account you're granted a limited to a number of simulations connections, so we need a way to atomically keep track of our active connections for all accounts being downloaded.
I've found multiple examples of atomic locks for Celery using Redis, but none that can keep track of a pool of limited resources like this, and all attempts to implement our own have resulted in difficult to debug race-conditions, causing our locks to intermittently never get released.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
由于 celery 使用进程的多处理库,因此您应该能够使用进程安全 <代码>multiprocessing.Semaphore([value])。
您需要预先创建信号量并将其传入,并且您可以设置一个等于您想要允许的最大并发访问数的默认值。然后在 IMAP 连接之前获取并在断开连接后释放。
As celery uses the multiprocessing library for processes, you should be able to use the process safe
multiprocessing.Semaphore([value])
.You will want to create the semaphore upfront and pass it in, and you can set a default value equal to the maximum number of concurrent accesses you want to allow. Then acquire before your IMAP connection and release after you disconnect.