如何知道一组 RabbitMQ 任务何时完成?

发布于 2024-12-09 12:02:00 字数 816 浏览 0 评论 0原文

我正在使用 RabbitMQ 让工作进程对视频文件进行编码。我想知道所有文件何时完成 - 即所有工作进程何时完成。

我能想到的唯一方法是使用数据库。当视频完成编码时:

UPDATE videos SET status = 'complete' WHERE filename = 'foo.wmv'
-- etc etc etc as each worker finishes --

然后检查是否所有视频都已编码:

SELECT count(*) FROM videos WHERE status != 'complete'

但是如果我要这样做,那么我觉得我正在失去 RabbitMQ 作为多个分布式工作进程机制的好处,因为我仍然需要手动维护数据库队列。

RabbitMQ 依赖项有标准机制吗?也就是说,“等待这 5 个任务完成,一旦完成,然后开始一个新任务?”

我不想让父进程将这些任务添加到队列中,然后“等待”每个任务返回“已完成”状态。然后我必须为每组视频维护一个单独的进程,此时与单个线程池概念相比,我已经失去了解耦工作进程的优势。

我是否在要求一些不可能的事情?或者,是否有广泛采用的标准解决方案来管理我错过的队列中任务的整体状态?

编辑:搜索后,我发现了这个类似的问题: 获取结果使用 RabbitMQ 进行长时间运行的任务

人们对此有什么特别的想法吗?

I am using RabbitMQ to have worker processes encode video files. I would like to know when all of the files are complete - that is, when all of the worker processes have finished.

The only way I can think to do this is by using a database. When a video finishes encoding:

UPDATE videos SET status = 'complete' WHERE filename = 'foo.wmv'
-- etc etc etc as each worker finishes --

And then to check whether or not all of the videos have been encoded:

SELECT count(*) FROM videos WHERE status != 'complete'

But if I'm going to do this, then I feel like I am losing the benefit of RabbitMQ as a mechanism for multiple distributed worker processes, since I still have to manually maintain a database queue.

Is there a standard mechanism for RabbitMQ dependencies? That is, a way to say "wait for these 5 tasks to finish, and once they are done, then kick off a new task?"

I don't want to have a parent process add these tasks to a queue and then "wait" for each of them to return a "completed" status. Then I have to maintain a separate process for each group of videos, at which point I've lost the advantage of decoupled worker processes as compared to a single ThreadPool concept.

Am I asking for something which is impossible? Or, are there standard widely-adopted solutions to manage the overall state of tasks in a queue that I have missed?

Edit: after searching, I found this similar question: Getting result of a long running task with RabbitMQ

Are there any particular thoughts that people have about this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

你好,陌生人 2024-12-16 12:02:00

使用“响应”队列。我不知道有关 RabbitMQ 的任何细节,所以这是一般性的:

  • 让您的父进程发送请求并跟踪发送的请求数量
  • 让父进程也等待特定的响应队列(子进程知道
  • )孩子完成某件事(或由于某种原因无法完成),向响应队列发送一条消息
  • 每当numSent == numResponded时,你就完成了

需要记住的事情是超时 - 会发生什么如果子进程死亡?您必须做更多的工作,但基本上是:

  • 对于每条发送的消息,包括某种 ID,并将该 ID 和当前时间添加到哈希表中。
  • 对于每个响应,从哈希表中删除该 ID
  • 定期遍历哈希表并删除任何超时的内容

这称为 请求回复模式

Use a "response" queue. I don't know any specifics about RabbitMQ, so this is general:

  • Have your parent process send out requests and keep track of how many it sent
  • Make the parent process also wait on a specific response queue (that the children know about)
  • Whenever a child finishes something (or can't finish for some reason), send a message to the response queue
  • Whenever numSent == numResponded, you're done

Something to keep in mind is a timeout -- What happens if a child process dies? You have to do slightly more work, but basically:

  • With every sent message, include some sort of ID, and add that ID and the current time to a hash table.
  • For every response, remove that ID from the hash table
  • Periodically walk the hash table and remove anything that has timed out

This is called the Request Reply Pattern.

沦落红尘 2024-12-16 12:02:00

输入图像描述这里

基于布伦丹非常有帮助的答案(应该被接受),我制作了这个对某些人有帮助的快速图表。

enter image description here

Based on Brendan's extremely helpful answer, which should be accepted, I knocked up this quick diagram which be helpful to some.

诺曦 2024-12-16 12:02:00

我已经实现了一个工作流,其中工作流状态机被实现为一系列队列。工作人员在一个队列上接收消息,处理工作,然后将相同的消息发布到另一个队列上。然后另一种类型的工作进程会获取该消息,等等。

在您的情况下,听起来您需要实现 企业集成模式(这是一本免费的在线书籍),并有一个简单的工作人员来收集消息,直到完成一组工作,然后将一条消息处理到代表工作流中下一步的队列。

I have implemented a workflow where the workflow state machine is implemented as a series of queues. A worker receives a message on one queue, processes the work, and then publishes the same message onto another queue. Then another type of worker process picks up that message, etc.

In your case, it sounds like you need to implement one of the patterns from Enterprise Integration Patterns (that is a free online book) and have a simple worker that collects messages until a set of work is done, and then processes a single message to a queue representing the next step in the workflow.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文