eventlet 可以管理 AMQP 连接以及异步传入和传出的消息吗?
实际设计:
对于那些回到这个问题的人,下面有用的答案推动我走向一个运行良好的可行设计。三个见解是关键:
- Eventlet 是一个非常安全的环境 - 如果两个 greenlet 都尝试从同一个套接字同时尝试
recv()
或都尝试send()
,那么 Eventlet 就会优雅地运行杀死第二个 greenlet 但有异常。这是伟大的,并且意味着如果 amqplib“绿化”不佳,将会导致简单的异常,而不是不可能重现的数据交错错误。 - amqplib 方法大致分为两类:
wait()
在recv()
内部循环,直到组装 AMQP 消息,而其他方法< code>send() 发回消息,并且不会尝试自己的recv()
。这是非常好的运气,因为amqplib
作者不知道有人会尝试“绿化”他们的库!这意味着消息发送不仅可以安全地免受wait()
调用的回调的影响,而且还可以从完全不受wait() 控制的其他 greenlet 安全地发送消息循环。这些安全方法(可以从任何 greenlet 调用,而不仅仅是从
wait()
回调中调用)是:basic_ack
basic_consume
与nowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
与nowait=True
exchange_delete
与nowait=True
queue_bind
和nowait=True
queue_unbind
与nowait=True
queue_declare
和nowait=True
queue_delete
与nowait=True
queue_purge
和nowait=True
- 信号量可以用作锁:使用计数
1
初始化信号量,然后使用acquire()
和release()
来锁定和解锁。我所有想要写入消息的异步 greenlet 都可以使用这样的锁来避免其单独的send()
调用交错并破坏 AMQP 协议。
所以我的代码大致如下:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
享受吧!
原始问题:
想象一下,每分钟有数百条 AMQP 消息到达一个小型 Python Eventlet 应用程序,每个都需要被处理和应答——其中处理的 CPU 开销将是最小的,但可能涉及等待来自其他服务和套接字的应答。
例如,为了允许同时处理 100 条消息,我当然可以启动 100 个与 RabbitMQ 的独立 TCP 连接,并为每个连接配备一个工作人员,以锁步方式接收、处理和应答单个消息。但为了节省 TCP 连接,我更愿意只创建一个 AMQP 连接,允许 RabbitMQ 全速通过管道传输消息,将这些任务交给工作人员,并在每个工作人员完成时发送回答案:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
:
- 观察 href="http://eventlet.net/doc/modules/queue.html" rel="noreferrer">Eventlet 队列 可以在工作人员可以完成更多工作时优雅地在工作人员之间分配传入的工作。
- RabbitMQ 的流量控制甚至是可能的:我只能在我的工作人员都忙起来之前才确认消息,然后在发送进一步的 ACK 之前等待,直到队列开始变空。
- 工作几乎肯定会无序完成:一个请求可能会很快完成,而另一个较早到达的事件则需要更长的时间;有些请求可能永远不会完成;因此工作人员将以不可预测的异步顺序传回响应。
在看到这篇有吸引力的博客文章后,我一直计划使用 Eventlet 和 py-amqplib 来编写此内容关于如何轻松地将 AMQP 库引入 Eventlet 处理模型:
http://blog.eventlet。 net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
我的问题是,阅读了这两个库的文档后,amqplib源代码,以及大部分 Eventlet 源代码,我无法弄清楚如何将拥有 AMQP 连接的 eventlet(博客文章中名为 connect_to_host()
的 eventlet)传授给 当工作人员完成工作并生成答案时醒来。 amqplib 中的 wait()
方法只能通过 AMQP 套接字上的活动来唤醒。虽然感觉我应该能够让工作人员将他们的答案写入队列,并在新传入消息到达时唤醒 connect_to_host()
eventlet 或者当工作人员准备好发送答案时,我找不到任何方法让 eventlet 说“当这些事情发生时唤醒我”。
我确实想到,工作人员可以尝试征用 AMQP 连接对象(甚至原始套接字)并通过 TCP 将自己的消息写回;但似乎有必要使用锁来防止传出工作消息相互交错或与主侦听器 eventlet 写入的 ACK 消息交错,而且我也找不到 Eventlet 中可用锁的位置。
所有这一切让我几乎可以肯定,我正试图以某种完全倒退的方式解决这个问题。像这样的问题(让侦听器调度程序和许多工作人员之间安全共享单个连接)是否根本不映射到协程模型,并且需要成熟的异步库? (在这种情况下:您是否会推荐一个解决此问题的解决方案,以及如何在传入消息和传出工作人员响应之间进行多路复用?今天早些时候,我在尝试 Pika + ioloop 等组合时发现没有干净的解决方案 - 尽管我刚刚看到了另一个库,stormed_amqp,可能比 Pika 做得更好。)或者我真的需要吗回归现实如果我想要可以执行此模型的干净且可维护的代码,请使用实时 Python 线程?我对所有选择持开放态度。
感谢您的任何帮助或想法!我一直认为我已经完全掌握了 Python 并发性,然后我再次发现我没有。 :) 不管怎样,我希望你喜欢上面的 ASCII 艺术。
Actual Design:
For those returning to this question, the helpful answer below pushed me towards a workable design that is running fine. Three insights were key:
- Eventlet is a very safe environment — if two greenlets both try
recv()
to or both trysend()
from the same socket simultaneously, then Eventlet elegantly kills the second greenlet with an exception. This is magnificent and means that simple exceptions, and not impossible-to-reproduce data interleaving errors, will result ifamqplib
“greens” poorly. - The
amqplib
methods fall roughly into two groups:wait()
loops inside ofrecv()
until an AMQP message is assembled, while other methodssend()
messages back and will not attempt their ownrecv()
. This is stunningly good luck, given that theamqplib
authors had no idea that someone would try to “green” their library! It means that message sending is not only safe from the callback invoked bywait()
, but that messages can also be sent safely from other greenlets that are completely outside the control of thewait()
loop. These safe methods — that can be called from any greenlet, not just from thewait()
callback — are:basic_ack
basic_consume
withnowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
withnowait=True
exchange_delete
withnowait=True
queue_bind
withnowait=True
queue_unbind
withnowait=True
queue_declare
withnowait=True
queue_delete
withnowait=True
queue_purge
withnowait=True
- Semaphores can be used as locks: initialize the semaphore with the count
1
and thenacquire()
andrelease()
to lock and unlock. All of my async greenlets that want to write messages can use such a lock to avoid having their separatesend()
calls interleave and ruin the AMQP protocol.
So my code looks roughly like this:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
Enjoy!
Original Question:
Imagine hundreds of AMQP messages arriving each minute at a small Python Eventlet application, that each need to be processed and answered — where the CPU overhead of the processing will be minimal, but might involve waiting on answers from other services and sockets.
To allow, say, 100 messages to be processed at once, I could of course spin up 100 separate TCP connections to RabbitMQ and have a worker for each connection that receives, processes, and answers single messages in lock-step. But to conserve TCP connections I would prefer to create just one AMQP connection, allow RabbitMQ to stream messages down the pipe at me at full speed, hand those tasks off to workers, and send answers back when each worker completes:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
Observe that:
- An Eventlet queue could elegantly distribute incoming work among the workers as they become available for more work.
- Flow control from RabbitMQ might even be possible: I can ACK messages only until my workers are all busy, and then wait before sending further ACKs until the queue starts to empty.
- Work will be almost certainly completed out-of-order: one request might finish quickly while another event that arrived earlier takes much longer; and some requests might never complete at all; so the workers will be handing back responses in an unpredictable and asynchronous order.
I had been planning to write this using Eventlet and py-amqplib after seeing this attractive blog post about how easily that AMQP library could be pulled into the Eventlet processing model:
http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
My problem is that, having read the documentation for both libraries, the amqplib source code, and much of the Eventlet source code, I cannot figure out how I can teach the eventlet that owns the AMQP connection — the eventlet named connect_to_host()
in the blog post — to also wake up when a worker completes its work and generates an answer. The wait()
method in amqplib can only be awoken through activity on the AMQP socket. Though it feels like I ought to be able to have the workers write their answers to a queue, and have the connect_to_host()
eventlet wake up either when a new incoming message arrives or when a worker is ready with an answer to send, I cannot find any way for an eventlet to say “wake me when either of these things happens.”
It did occur to me that the workers could try commandeering the AMQP connection object — or even the raw socket — and writing their own messages back over TCP; but it seems as though locks would be necessary to prevent the outgoing worker messages from getting interleaved with each other or with ACK messages written by the main listener eventlet, and I cannot find where locks are available in Eventlet either.
All of this makes me feel almost certain that I am trying to tackle this problem somehow exactly backwards. Does a problem like this — letting a single connection be safely shared between a listener-dispatcher and many workers — simply not map to a coroutine model, and require a full-fledged async library? (In which case: is there one you would recommend for this problem, and how would the multiplexing take place between incoming messages and outgoing worker responses? I found no clean solution earlier today trying combinations like Pika + ioloop — though I have just seen another library, stormed_amqp, that might do better than Pika did.) Or do I actually need to fall back on real live Python threads if I want clean and maintainable code that can enact this model? I am open to all options.
Thanks for any help or ideas! I keep thinking that I have the whole concurrency-in-Python thing pretty much down, then I learn yet again that I do not. :) And I hope you liked the ASCII art above in any case.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
阅读您的文章并使用 gevent 与 eventlet 类似的库后,我清楚了一些事情,因为我刚刚解决了类似的问题
一般来说,不需要锁定,因为只有一个 eventlet 或 greenlet 同时运行很长一段时间,他们都没有阻止所有事情似乎同时运行..但是你不想在另一个 greenlet 正在发送时沿着套接字发送数据。你是对的,确实需要锁定。
如果我有这样的问题,仅查看文档是不够的......去查看源代码!不管怎样,它是开源的,你可以通过查看其他人的代码学到更多东西。
这里有一些简化的示例代码,可能会帮助您解决问题。
在您的调度程序中有 2 个队列
,工作人员将其结果放入服务器队列中。
发送和接收代码
连接代码的发送函数中的
After reading your post and working with gevent a similar libary as eventlet a few things became clear to me because I just solved a similar problem
In general there is no need for locking since there ever is only one eventlet or greenlet running at the same time as long none of them are blocking everything seems to run at the same time.. BUT yout dont want to send data down a socket while another greenlet is sending to. you are right and indeed need a locking for that.
If I have questions like these looking in the documentation is not enough.. go look in the source! its opensource anyway you learn a ton more looking at other people code.
here is some simplified example code that might clear things up for you.
in your dispatcher have 2 queue's
have workers put their result on the server queue.
the sending and recieving code
in your connection code's send function