使用 PyZMQ 限制队列长度

发布于 2025-01-07 19:04:21 字数 1247 浏览 0 评论 0原文

我想限制 Python 应用程序中 ZeroMQ 消息队列消耗的内存量。我知道设置高水位线会限制发送方排队的数量,但是有没有办法控制接收方排队的数量? Python ZeroMQ 绑定似乎将其设置为无限制。

我的测试场景:我有两个用于测试的 python 终端。一个是接收方:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")

另一个是发送方:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
...  print num
...  socket.send(str(num))
...  num = num + 1
... 

我在接收方运行了几次 socket.recv() 以确保队列正常工作,但除此之外,让两个终端坐在那里。发送循环似乎永远不会阻塞,并且接收提示似乎内存占用不断增加。

I want to limit the amount of memory consumed by my ZeroMQ message queues in a Python application. I know that setting the high-water mark will limit the amount that will be queued on the sender side, but is there a way to control how much will be queued on the receiver side? The Python ZeroMQ binding seems to have it set at unlimited.

My test scenario: I have two python terminals that I am using for testing. One is the receiver:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")

The other is the sender:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
...  print num
...  socket.send(str(num))
...  num = num + 1
... 

I run socket.recv() on the receiver side a couple times to make sure that the queue works, but other than that, let the two terminals just sit there. The send loop seems to never block and the receive prompt seems to have a growing memory footprint.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

李白 2025-01-14 19:04:21

与 ZeroMQ 的文档相矛盾的是,需要在 PUSH 端和 PULL 端设置高水位线。一旦我改变了PULL,它的效果就更好了。新的PULL代码是:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.bind("tcp://127.0.0.1:12345")

In contradiction to the documentation of ZeroMQ, the high water mark needs to be set on both the PUSH side and the PULL side. Once I changed the PULL, it worked better. The new PULL code is:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.bind("tcp://127.0.0.1:12345")
苏辞 2025-01-14 19:04:21

通过 zmq.SNDBUFzmq.RCVBUF 选项,您可以设置缓冲区限制大小。


另外,我在接收方使用 zmq.CONFLATE 选项将 ZeroMQ 队列大小限制为 1:

这是 ZMQ PUSH/PULL 的示例:

发送方 (zmq.PUSH):

def create_pub_socket(ip, port):
    try:
        context = zmq.Context()
        socket = context.socket(zmq.PUSH)
        socket.setsockopt(zmq.SNDHWM, 1)
        zmq_address = "tcp://{}:{}".format(ip, port)
        socket.connect(zmq_address)
        return socket

    except zmq.ZMQError as exp:
        print(exp)
        return False

sock = create_push_socket('127.0.0.1', 5558)
if sock:
    sock.send_json({'a': 1})

Getter (zmq.PULL):

def listen(self):
    sock = None
    try:
        context = zmq.Context()
        sock = context.socket(zmq.PULL)
        sock.setsockopt(zmq.RCVHWM, 1)
        sock.setsockopt(zmq.CONFLATE, 1)  # last msg only.
        sock.bind("tcp://*:5558")

    except zmq.ZMQError:
        logger.captureException()

    configs = None
    while configs is None:
        if sock:
            configs = sock.recv_json()
            time.sleep(1e-1)
        else:
            time.sleep(5)
            listen()  # Recursive.
listen()

By zmq.SNDBUF and zmq.RCVBUF options you could set a limit on buffer size.


Also, I use zmq.CONFLATE option in the reciever side to limit the ZeroMQ queue size to just one:

Here's an example with ZMQ PUSH/PULL:

Sender (zmq.PUSH):

def create_pub_socket(ip, port):
    try:
        context = zmq.Context()
        socket = context.socket(zmq.PUSH)
        socket.setsockopt(zmq.SNDHWM, 1)
        zmq_address = "tcp://{}:{}".format(ip, port)
        socket.connect(zmq_address)
        return socket

    except zmq.ZMQError as exp:
        print(exp)
        return False

sock = create_push_socket('127.0.0.1', 5558)
if sock:
    sock.send_json({'a': 1})

Getter (zmq.PULL):

def listen(self):
    sock = None
    try:
        context = zmq.Context()
        sock = context.socket(zmq.PULL)
        sock.setsockopt(zmq.RCVHWM, 1)
        sock.setsockopt(zmq.CONFLATE, 1)  # last msg only.
        sock.bind("tcp://*:5558")

    except zmq.ZMQError:
        logger.captureException()

    configs = None
    while configs is None:
        if sock:
            configs = sock.recv_json()
            time.sleep(1e-1)
        else:
            time.sleep(5)
            listen()  # Recursive.
listen()
半窗疏影 2025-01-14 19:04:21

事实上,文档是这样说的:

“当 ZMQ_PUSH 套接字由于具有
达到所有下游节点的高水位线,或者如果有
根本没有下游节点,则任何 zmq_send(3) 操作
套接字应阻塞,直到异常状态结束或至少一个
下游节点可用于发送;消息不是
被丢弃。”

http://api.zeromq.org/2-1:zmq-socket< /a>

这直接表明您可以(并且应该)为下游节点(也称为拉)设置高水位线,并且可能意味着将其设置在推侧将不会产生任何效果(尽管我怀疑这不是真的,因为有依然如此其中下游节点可用,但消息传入的速度比发送的速度快。)

Actually, the documentation says this:

"When a ZMQ_PUSH socket enters an exceptional state due to having
reached the high water mark for all downstream nodes, or if there are
no downstream nodes at all, then any zmq_send(3) operations on the
socket shall block until the exceptional state ends or at least one
downstream node becomes available for sending; messages are not
discarded."

http://api.zeromq.org/2-1:zmq-socket

Which outright states that you can (and should) set the high water mark for downstream nodes (aka pull), and perhaps implies that setting it on the push side will have no effect (although I suspect that's not true, because there is still the case where downstream nodes are available but messages are coming in faster than they can be sent.)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文