使用 PyZMQ 限制队列长度
我想限制 Python 应用程序中 ZeroMQ 消息队列消耗的内存量。我知道设置高水位线会限制发送方排队的数量,但是有没有办法控制接收方排队的数量? Python ZeroMQ 绑定似乎将其设置为无限制。
我的测试场景:我有两个用于测试的 python 终端。一个是接收方:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")
另一个是发送方:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
... print num
... socket.send(str(num))
... num = num + 1
...
我在接收方运行了几次 socket.recv()
以确保队列正常工作,但除此之外,让两个终端坐在那里。发送循环似乎永远不会阻塞,并且接收提示似乎内存占用不断增加。
I want to limit the amount of memory consumed by my ZeroMQ message queues in a Python application. I know that setting the high-water mark will limit the amount that will be queued on the sender side, but is there a way to control how much will be queued on the receiver side? The Python ZeroMQ binding seems to have it set at unlimited.
My test scenario: I have two python terminals that I am using for testing. One is the receiver:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")
The other is the sender:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
... print num
... socket.send(str(num))
... num = num + 1
...
I run socket.recv()
on the receiver side a couple times to make sure that the queue works, but other than that, let the two terminals just sit there. The send loop seems to never block and the receive prompt seems to have a growing memory footprint.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
与 ZeroMQ 的文档相矛盾的是,需要在
PUSH
端和PULL
端设置高水位线。一旦我改变了PULL
,它的效果就更好了。新的PULL
代码是:In contradiction to the documentation of ZeroMQ, the high water mark needs to be set on both the
PUSH
side and thePULL
side. Once I changed thePULL
, it worked better. The newPULL
code is:通过
zmq.SNDBUF
和zmq.RCVBUF
选项,您可以设置缓冲区限制大小。另外,我在接收方使用
zmq.CONFLATE
选项将 ZeroMQ 队列大小限制为 1:这是 ZMQ
PUSH/PULL
的示例:发送方 (
zmq.PUSH
):Getter (
zmq.PULL
):By
zmq.SNDBUF
andzmq.RCVBUF
options you could set a limit on buffer size.Also, I use
zmq.CONFLATE
option in the reciever side to limit the ZeroMQ queue size to just one:Here's an example with ZMQ
PUSH/PULL
:Sender (
zmq.PUSH
):Getter (
zmq.PULL
):事实上,文档是这样说的:
http://api.zeromq.org/2-1:zmq-socket< /a>
这直接表明您可以(并且应该)为下游节点(也称为拉)设置高水位线,并且可能意味着将其设置在推侧将不会产生任何效果(尽管我怀疑这不是真的,因为有依然如此其中下游节点可用,但消息传入的速度比发送的速度快。)
Actually, the documentation says this:
http://api.zeromq.org/2-1:zmq-socket
Which outright states that you can (and should) set the high water mark for downstream nodes (aka pull), and perhaps implies that setting it on the push side will have no effect (although I suspect that's not true, because there is still the case where downstream nodes are available but messages are coming in faster than they can be sent.)