如何布局队列/工作人员结构以支持多个环境的大型任务?

发布于 2024-08-31 12:26:03 字数 632 浏览 6 评论 0原文

对于基于 Python/Django/Celery 的部署工具,我们有以下设置:

  1. 我们当前使用默认的 Celery 设置。 (一个队列+交换称为“celery”。)
  2. 队列上的每个任务代表一个部署操作。
  3. 环境的每个任务都以可能需要(非常)长时间的同步阶段结束。

需要满足以下规格:

  1. 并发:多个环境的任务应同时执行。
  2. 锁定:每个环境可能同时运行至多一个任务(即环境锁定)。
  3. 吞吐量优化:当单个环境有多个任务时,可以组合它们的同步阶段进行优化。因此,如果任务接近结束,它应该检查队列中是否有新任务在等待该环境,如果有,则跳过其同步阶段。

实现此目的的首选方法是什么?

一些想法:

  • 我想说我们必须设置多个队列:每个环境一个,并且让 N 个 celery 工作人员专门处理一个队列,每个队列。 (这将解决规范 1+2。)
    但是我们如何让多个 celery 工作线程专门监听不同的队列呢?
  • 有没有一种干净的方法可以知道有更多任务在队列中等待环境?

For a Python/Django/Celery based deployment tool, we have the following setup:

  1. We currently use the default Celery setup. (One queue+exchange called "celery".)
  2. Each Task on the queue represents a deployment operation.
  3. Each task for an environment ends with a synchronisation phase that potentially takes (very) long.

The following specs need to be fulfilled:

  1. Concurrency: tasks for multiple environments should be carried out concurrently.
  2. Locking: There may be at most one task running for each environment at the same time (i.e. environments lock).
  3. Throughput optimization: When there are multiple tasks for a single environment, their sync phases may be combined for optimization. So if a task comes near its ending, it should check if there are new tasks waiting in the queue for this environment and, if so, skip its sync phase.

What is the preferred way to implement this?

Some thoughts:

  • I would say we have to set up multiple queues: one for each environment, and have N celery workers processing a single queue exclusively, each. (This would solve spec 1+2.)
    But how do we get multiple celery workers to listen to different queues exclusively?
  • Is there a clean way of knowing there are more tasks waiting in the queue for an environment?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

千鲤 2024-09-07 12:26:03

对于 1,2 使用多个队列并使用 -Q 启动工作程序来指定要侦听的队列。
还要配置 CELERYD_PREFETCH_MULTIPLIER = 1,一次仅针对一项任务。

要获取队列长度(使用rabbitmq测试),您可以使用类似这样的内容:

from kombu.connection import BrokerConnection
connection = BrokerConnection(BROKER_HOST, BROKER_USER...)
channel = connection.channel()
q, j, c = channel.queue_declare('celery', passive=True)
print 'celery %d jobs in queue' % j

“queue_delcare”作为副作用,为您提供队列的长度。
希望这可以帮助你。

for 1,2 use multiple queues and launch workers with -Q to specify what queue to listen.
Also configure CELERYD_PREFETCH_MULTIPLIER = 1, for only one task at a time.

To get the queue lenght (tested with rabbitmq), you can use something like this:

from kombu.connection import BrokerConnection
connection = BrokerConnection(BROKER_HOST, BROKER_USER...)
channel = connection.channel()
q, j, c = channel.queue_declare('celery', passive=True)
print 'celery %d jobs in queue' % j

'queue_delcare' as a side effect, give you the queue's length.
Hope this can help you.

哑剧 2024-09-07 12:26:03

我会看一下 zeromq 它可以在一个超快速的库中进行消息传递和多线程处理。它还支持大量语言并内置负载平衡。

I would take a look at zeromq it can do messaging and multi-threading in one super fast library. It also supports a large number of languages and has built in load balancing.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文