ZeroMQ工作人员应该如何安全地“挂断”?
我这周开始使用 ZeroMQ,当使用请求-响应模式时,我不确定如何让工作人员安全地“挂断”并关闭他的套接字,而不会丢失消息并导致发送该消息的客户永远不会收到消息。回复。想象一下用 Python 编写的工作人员看起来像这样:
import zmq
c = zmq.Context()
s = c.socket(zmq.REP)
s.connect('tcp://127.0.0.1:9999')
while i in range(8):
s.recv()
s.send('reply')
s.close()
我一直在做实验,发现 127.0.0.1:9999
的套接字类型 zmq.REQ
的客户发出公平排队请求可能会不幸地让公平排队算法在工作程序完成最后一个 send()
之后但在运行以下 close 之前立即选择上述工作程序()
方法。在这种情况下,看起来请求是由工作进程中的 ØMQ 堆栈接收并缓冲的,并且当 close()
抛出与套接字关联的所有内容时,请求会丢失。
工作人员如何“安全”地分离 - 有没有办法发出“我不再需要消息”的信号,然后(a)循环信号传输期间到达的任何最终消息,(b)生成回复,然后 (c) 执行 close()
并保证不会丢弃任何消息?
编辑:我想我想要进入的原始状态是“半关闭”状态,其中无法接收进一步的请求 - 发送者会知道 - 但返回路径在哪里仍然打开,以便我可以检查传入缓冲区中是否有最后到达的消息,并在缓冲区中有消息时对其进行响应。
编辑:为了回答一个好问题,更正了描述,使等待消息的数量变为复数,因为可能有很多连接正在等待回复。
I started using ZeroMQ this week, and when using the Request-Response pattern I am not sure how to have a worker safely "hang up" and close his socket without possibly dropping a message and causing the customer who sent that message to never get a response. Imagine a worker written in Python who looks something like this:
import zmq
c = zmq.Context()
s = c.socket(zmq.REP)
s.connect('tcp://127.0.0.1:9999')
while i in range(8):
s.recv()
s.send('reply')
s.close()
I have been doing experiments and have found that a customer at 127.0.0.1:9999
of socket type zmq.REQ
who makes a fair-queued request just might have the misfortune of having the fair-queuing algorithm choose the above worker right after the worker has done its last send()
but before it runs the following close()
method. In that case, it seems that the request is received and buffered by the ØMQ stack in the worker process, and that the request is then lost when close()
throws out everything associated with the socket.
How can a worker detach "safely" — is there any way to signal "I don't want messages anymore", then (a) loop over any final messages that have arrived during transmission of the signal, (b) generate their replies, and then (c) execute close()
with the guarantee that no messages are being thrown away?
Edit: I suppose the raw state that I would want to enter is a "half-closed" state, where no further requests could be received — and the sender would know that — but where the return path is still open so that I can check my incoming buffer for one last arrived message and respond to it if there is one sitting in the buffer.
Edit: In response to a good question, corrected the description to make the number of waiting messages plural, as there could be many connections waiting on replies.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(6)
尽快向工作人员发送请求与在工作人员崩溃或死亡时获得可靠性之间存在利益冲突。 ZeroMQ 指南中有一个完整的部分解释了这个可靠性问题的不同答案。读一下,会有很大帮助。
tl;dr 工作人员可能/将会崩溃,并且客户端需要重新发送功能。该指南提供了多种语言的可重用代码。
There is a conflict of interest between sending requests as rapidly as possible to workers, and getting reliability in case a worked crashes or dies. There is an entire section of the ZeroMQ Guide that explains different answers to this question of reliability. Read that, it'll help a lot.
tl;dr workers can/will crash and clients need a resend functionality. The Guide provides reusable code for that, in many languages.
最简单的解决方案不是让客户在等待回复时超时,如果没有收到回复则重试吗?
Wouldn't the simplest solution be to have the customer timeout when waiting for the reply and then retry if no reply is received?
尝试在通话结束之前睡觉。这在 2.1 中已修复,但在 2.0 中尚未修复。
Try sleeping before the call to close. This is fixed in 2.1 but not in 2.0 yet.
您似乎认为您正在尝试避免“简单”的竞争条件,例如
,但我认为问题是公平排队(循环)使事情变得更加困难:您甚至可能已经有几个对您的工作人员排队的请求。如果轮到接收请求,发送者将不会等待您的工作线程空闲后再发送新请求,因此在您调用
zmq_send
时,其他请求可能已经在等待。事实上,看起来您可能选择了错误的数据方向。您可能希望让工作人员从请求队列中获取新请求,处理它,然后发送答案,而不是让请求池向您的工作人员发送请求(即使您不想接收新请求)。
当然,这意味着使用
XREP
/XREQ
,但我认为这是值得的。编辑:我编写了一些代码实现另一个方向来解释我的意思。
You seem to think that you are trying to avoid a “simple” race condition such as in
but I think the problem is that fair queuing (round-robin) makes things even more difficult: you might already even have several queued requests on your worker. The sender will not wait for your worker to be free before sending a new request if it is its turn to receive one, so at the time you call
zmq_send
other requests might be waiting already.In fact, it looks like you might have selected the wrong data direction. Instead of having a requests pool send requests to your workers (even when you would prefer not to receive new ones), you might want to have your workers fetch a new request from a requests queue, take care of it, then send the answer.
Of course, it means using
XREP
/XREQ
, but I think it is worth it.Edit: I wrote some code implementing the other direction to explain what I mean.
我认为问题在于你的消息传递架构是错误的。您的工作人员应该使用 REQ 套接字发送工作请求,这样工作人员处就只有一个作业在排队。然后,为了确认工作完成,您可以使用另一个 REQ 请求,该请求兼作前一个作业的确认和新作业的请求,或者您可以使用第二个控制套接字。
有些人使用 PUB/SUB 进行控制,以便每个工作人员发布 ack,而 master 订阅它们。
您必须记住,使用 ZeroMQ 时有 0 个消息队列。一点也没有!仅根据高水位线和套接字类型等设置在发送方或接收方中缓冲消息。如果您确实需要消息队列,那么您需要编写一个代理应用程序来处理它,或者干脆切换到 AMQP,其中所有通信都是通过第 3 方代理进行的。
I think the problem is that your messaging architecture is wrong. Your workers should use a REQ socket to send a request for work and that way there is only ever one job queued at the worker. Then to acknowledge completion of the work, you could either use another REQ request that doubles as ack for the previous job and request for a new one, or you could have a second control socket.
Some people do this using PUB/SUB for the control so that each worker publishes acks and the master subscribes to them.
You have to remember that with ZeroMQ there are 0 message queues. None at all! Just messages buffered in either the sender or receiver depending on settings like High Water Mark, and type of socket. If you really do need message queues then you need to write a broker app to handle that, or simply switch to AMQP where all communication is through a 3rd party broker.
我也一直在思考这个问题。您可能想要实现一条 CLOSE 消息,通知客户工作人员即将离开。然后,您可以在关闭之前让工作人员排水一段时间。当然,这并不理想,但可能可行。
I've been thinking about this as well. You may want to implement a CLOSE message which notifies the customer that the worker is going away. You could then have the worker drain for a period of time before shutting down. Not ideal, of course, but might be workable.