使用 Python、Pika 和 AMQP 设计异步 RPC 应用程序的最佳模式是什么?

发布于 2024-12-04 11:53:13 字数 907 浏览 2 评论 0原文

我的应用程序的生产者模块由想要提交要在小型集群上完成的工作的用户运行。它通过 RabbitMQ 消息代理以 JSON 形式发送订阅。

我尝试了几种策略,到目前为止最好的是以下策略,但仍然无法完全正常工作:

每个集群机器运行一个消费者模块,该模块自行订阅到 AMQP 队列并发出 prefetch_count 告诉代理它可以同时运行多少个任务。

我能够使用 Pika AMQP 库中的 SelectConnection 使其工作。消费者和生产者都启动两个通道,每个通道连接到每个队列。生产者在通道 [A] 上发送请求并在通道 [B] 中等待响应,消费者在通道 [A] 上等待请求并在通道 [B] 上发送响应。然而,当消费者运行计算响应的回调时,它似乎会阻塞,因此我每次只在每个消费者处执行一项任务。

我最终需要的是:

  1. 消费者 [A] 向集群订阅他的任务(每次大约 5k),
  2. 它可以处理的并发任务数
  3. 代理为每个消费者分派 N 个消息/请求,其中 N 是单个任务时 完成后,消费者将结果回复给经纪人/生产者,
  4. 生产者收到回复,更新计算状态,最后打印一些报告

限制:

  • 如果另一个用户提交工作,则所有他的任务将是在前一个用户之后排队(我猜这在队列系统中是自动成立的,但我没有考虑对线程环境的影响)
  • 任务有提交的顺序,但它们回复的顺序并不重要

更新

我进一步研究了一些,我的实际问题似乎是我使用一个简单的函数作为 pika 的 SelectConnection.channel.basic_consume() 函数的回调。我的最后一个(未实现的)想法是传递一个线程函数,而不是常规函数,这样回调就不会阻塞并且消费者可以继续监听。

The producer module of my application is run by users who want to submit work to be done on a small cluster. It sends the subscriptions in JSON form through the RabbitMQ message broker.

I have tried several strategies, and the best so far is the following, which is still not fully working:

Each cluster machine runs a consumer module, which subscribes itself to the AMQP queue and issues a prefetch_count to tell the broker how many tasks it can run at once.

I was able to make it work using SelectConnection from the Pika AMQP library. Both consumer and producer start two channels, one connected to each queue. The producer sends requests on channel [A] and waits for responses in channel [B], and the consumer waits for requests on channel [A] and send responses on channel [B]. It seems, however, that when the consumer runs the callback that calculates the response, it blocks, so I have only one task executed at each consumer at each time.

What I need in the end:

  1. the consumer [A] subscribes his tasks (around 5k each time) to the cluster
  2. the broker dispatches N messages/requests for each consumer, where N is the number of concurrent tasks it can handle
  3. when a single task is finished, the consumer replies to the broker/producer with the result
  4. the producer receives the replies, update the computation status and, in the end, prints some reports

Restrictions:

  • If another user submits work, all of his tasks will be queued after the previous user (I guess this is automatically true from the queue system, but I haven't thought about the implications on a threaded environment)
  • Tasks have an order to be submitted, but the order they are replied is not important

UPDATE

I have studied a bit further and my actual problem seems to be that I use a simple function as callback to the pika's SelectConnection.channel.basic_consume() function. My last (unimplemented) idea is to pass a threading function, instead of a regular one, so the callback would not block and the consumer can keep listening.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

善良天后 2024-12-11 11:53:13

正如您所注意到的,您的进程在运行回调时会阻塞。有多种方法可以处理此问题,具体取决于您的回调的作用。

如果您的回调是 IO 绑定的(执行大量网络或磁盘 IO),您可以使用线程或基于 greenlet 的解决方案,例如 geventeventlet温室。但请记住,Python 受到 GIL(全局解释器锁)的限制,这意味着单个 Python 进程中只能运行一段 Python 代码。这意味着,如果您使用 python 代码进行大量计算,这些解决方案可能不会比您已有的解决方案快很多。

另一种选择是使用 multiprocessing。我发现多处理在进行并行工作时非常有用。您可以通过使用 Queue,让父进程作为消费者并将工作外包给其子进程,或者简单地启动多个进程,每个进程都自己消耗。我建议,除非您的应用程序是高度并发的(数千个工作线程),否则只需启动多个工作线程,每个工作线程都从自己的连接中消耗数据。这样,您可以使用 AMQP 的确认功能,因此如果消费者在仍在处理任务时死亡,消息会自动发送回队列并由另一个工作线程拾取,而不是简单地丢失请求。

最后一个选择,如果您控制生产者并且它也是用 Python 编写的,那就是使用像 celery 为您抽象任务/队列的工作方式。我已经在几个大型项目中使用了 celery,并且发现它写得非常好。它还将通过适当的配置为您处理多个消费者问题。

As you have noticed, your process blocks when it runs a callback. There are several ways to deal with this depending on what your callback does.

If your callback is IO-bound (doing lots of networking or disk IO) you can use either threads or a greenlet-based solution, such as gevent, eventlet, or greenhouse. Keep in mind, though, that Python is limited by the GIL (Global Interpreter Lock), which means that only one piece of python code is ever running in a single python process. This means that if you are doing lots of computation with python code, these solutions will likely not be much faster than what you already have.

Another option would be to implement your consumer as multiple processes using multiprocessing. I have found multiprocessing to be very useful when doing parallel work. You could implement this by either using a Queue, having the parent process being the consumer and farming out work to its children, or by simply starting up multiple processes which each consume on their own. I would suggest, unless your application is highly concurrent (1000s of workers), to simply start multiple workers, each of which consumes from their own connection. This way, you can use the acknowledgement feature of AMQP, so if a consumer dies while still processing a task, the message is sent back to the queue automatically and will be picked up by another worker, rather than simply losing the request.

A last option, if you control the producer and it is also written in Python, is to use a task library like celery to abstract the task/queue workings for you. I have used celery for several large projects and have found it to be very well written. It will also handle the multiple consumer issues for you with the appropriate configuration.

水染的天色ゝ 2024-12-11 11:53:13

你的设置对我来说听起来不错。你是对的,你可以简单地设置回调来启动一个线程,并在线程完成通过通道 B 将响应排队返回时将其链接到一个单独的回调。

基本上,你的消费者应该有一个自己的队列(大小为 N ,他们支持的并行度)。当请求通过通道 A 传入时,它应该将结果存储在 Pika 的主线程和线程池中的工作线程之间共享的队列中。一旦进入队列,pika 就应该用 ACK 进行响应,并且您的工作线程将被唤醒并开始处理。

一旦工作人员完成工作,它会将结果排队回到单独的结果队列中,并向主线程发出回调以将其发送回使用者。

您应该小心并确保工作线程在使用任何共享资源时不会相互干扰,但这是一个单独的主题。

Your setup sounds good to me. And you are right, you can simply set the callback to start a thread and chain that to a separate callback when the thread finishes to queue the response back over Channel B.

Basically, your consumers should have a queue of their own (size of N, amount of parallelism they support). When a request comes in via Channel A, it should store the result in the queue shared between the main thread with Pika and the worker threads in the thread pool. As soon it is queued, pika should respond back with ACK, and your worker thread would wake up and start processing.

Once the worker is done with its work, it would queue the result back on a separate result queue and issue a callback to the main thread to send it back to the consumer.

You should take care and make sure that the worker threads are not interfering with each other if they are using any shared resources, but that's a separate topic.

桃扇骨 2024-12-11 11:53:13

由于在线程方面缺乏经验,我的设置将运行多个消费者进程(其数量基本上就是您的预取计数)。每个人都会连接到两个队列,并且他们会愉快地处理作业,而不知道彼此的存在。

Being unexperienced in threading, my setup would run multiple consumer processes (the number of which basically being your prefetch count). Each would connect to the two queues and they would process jobs happily, unknowning of eachother's existence.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文