如何设置并行消费者?

发布于 2025-01-15 07:44:27 字数 4333 浏览 3 评论 0原文

我想并行处理具有多个消费者的队列。但我的方法行不通。无论我使用多少个消费者,它们似乎都会互相阻塞,并且处理输入队列总是花费相同的时间,无论消费者的数量有多少:

import argparse
import json
import time

import pika


def make_connection() -> pika.BlockingConnection:

    credentials = pika.PlainCredentials(username="user", password="bitnami")
    kwargs = {'host': 'localhost', 'port': 5672, 'credentials': credentials}

    parameters = pika.ConnectionParameters(**kwargs)

    connection = pika.BlockingConnection(parameters=parameters)

    return connection


def publish(channel: pika.adapters.blocking_connection.BlockingChannel, data):
    message = {"data": data}
    message = json.dumps(message).encode()
    channel.basic_publish("", "in", message)


def make_inp_callback(marker: str):
    def inp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
        time.sleep(.2)
        message = {"data": json.loads(body)["data"] + " consumer " + marker}
        print(f"{marker}: {message}")
        message = json.dumps(message).encode()
        channel.basic_publish("", "out", message)
        channel.basic_ack(method.delivery_tag)

    return inp_callback


def make_outp_callback(n):
    def outp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
        body = json.loads(body)
        print(body)
        channel.basic_ack(method.delivery_tag)
        if body["data"].split(" ")[0] == str(n - 1):
            channel.close()

    return outp_callback


def set_up_consumers(channel, n):
    for i in range(n):
        marker = f"callback {i}"
        cb = make_inp_callback(marker)
        channel.basic_consume("in", cb)


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("n_consumers", type=int)
    parser.add_argument("n_messages", type=int)

    args = parser.parse_args()

    return args


def main():

    args = parse_args()

    connection = make_connection()
    channel = connection.channel()
    channel.basic_qos(prefetch_count=4)
    channel.queue_declare("in")
    channel.queue_declare("out")

    for i in map(str, range(args.n_messages)):
        publish(channel, i)

    start = time.time()

    set_up_consumers(channel, args.n_consumers)

    channel.basic_consume("out", make_outp_callback(args.n_messages))

    channel.start_consuming()

    print(
        f"time processing {args.n_messages} messages with {args.n_consumers} consumers: {time.time() - start}"
    )


if __name__ == "__main__":
    main()
-> % py multi_consumer_test.py 5 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
callback 1: {'data': '4 consumer callback 1'}
callback 1: {'data': '5 consumer callback 1'}
callback 1: {'data': '6 consumer callback 1'}
callback 1: {'data': '7 consumer callback 1'}
callback 2: {'data': '8 consumer callback 2'}
callback 2: {'data': '9 consumer callback 2'}
{'data': '0 consumer callback 0'}
{'data': '1 consumer callback 0'}
{'data': '2 consumer callback 0'}
{'data': '3 consumer callback 0'}
{'data': '4 consumer callback 1'}
{'data': '5 consumer callback 1'}
{'data': '6 consumer callback 1'}
{'data': '7 consumer callback 1'}
{'data': '8 consumer callback 2'}
{'data': '9 consumer callback 2'}
-> % py multi_consumer_test.py 1 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
{'data': '0 consumer callback 0'}
callback 0: {'data': '4 consumer callback 0'}
{'data': '1 consumer callback 0'}
callback 0: {'data': '5 consumer callback 0'}
{'data': '2 consumer callback 0'}
callback 0: {'data': '6 consumer callback 0'}
{'data': '3 consumer callback 0'}
callback 0: {'data': '7 consumer callback 0'}
{'data': '4 consumer callback 0'}
callback 0: {'data': '8 consumer callback 0'}
{'data': '5 consumer callback 0'}
callback 0: {'data': '9 consumer callback 0'}
{'data': '6 consumer callback 0'}
{'data': '7 consumer callback 0'}
{'data': '8 consumer callback 0'}
{'data': '9 consumer callback 0'}
time processing 10 messages with 1 consumers: 2.043266773223877

我的设置有什么错误?

I want to process a queue with multiple consumers in parallel. But my approach does not work. No matter how many consumers I use, they seem to block each other and processing the input queue always takes the same amount of time, regardless of the number of consumers:

import argparse
import json
import time

import pika


def make_connection() -> pika.BlockingConnection:

    credentials = pika.PlainCredentials(username="user", password="bitnami")
    kwargs = {'host': 'localhost', 'port': 5672, 'credentials': credentials}

    parameters = pika.ConnectionParameters(**kwargs)

    connection = pika.BlockingConnection(parameters=parameters)

    return connection


def publish(channel: pika.adapters.blocking_connection.BlockingChannel, data):
    message = {"data": data}
    message = json.dumps(message).encode()
    channel.basic_publish("", "in", message)


def make_inp_callback(marker: str):
    def inp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
        time.sleep(.2)
        message = {"data": json.loads(body)["data"] + " consumer " + marker}
        print(f"{marker}: {message}")
        message = json.dumps(message).encode()
        channel.basic_publish("", "out", message)
        channel.basic_ack(method.delivery_tag)

    return inp_callback


def make_outp_callback(n):
    def outp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
        body = json.loads(body)
        print(body)
        channel.basic_ack(method.delivery_tag)
        if body["data"].split(" ")[0] == str(n - 1):
            channel.close()

    return outp_callback


def set_up_consumers(channel, n):
    for i in range(n):
        marker = f"callback {i}"
        cb = make_inp_callback(marker)
        channel.basic_consume("in", cb)


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("n_consumers", type=int)
    parser.add_argument("n_messages", type=int)

    args = parser.parse_args()

    return args


def main():

    args = parse_args()

    connection = make_connection()
    channel = connection.channel()
    channel.basic_qos(prefetch_count=4)
    channel.queue_declare("in")
    channel.queue_declare("out")

    for i in map(str, range(args.n_messages)):
        publish(channel, i)

    start = time.time()

    set_up_consumers(channel, args.n_consumers)

    channel.basic_consume("out", make_outp_callback(args.n_messages))

    channel.start_consuming()

    print(
        f"time processing {args.n_messages} messages with {args.n_consumers} consumers: {time.time() - start}"
    )


if __name__ == "__main__":
    main()
-> % py multi_consumer_test.py 5 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
callback 1: {'data': '4 consumer callback 1'}
callback 1: {'data': '5 consumer callback 1'}
callback 1: {'data': '6 consumer callback 1'}
callback 1: {'data': '7 consumer callback 1'}
callback 2: {'data': '8 consumer callback 2'}
callback 2: {'data': '9 consumer callback 2'}
{'data': '0 consumer callback 0'}
{'data': '1 consumer callback 0'}
{'data': '2 consumer callback 0'}
{'data': '3 consumer callback 0'}
{'data': '4 consumer callback 1'}
{'data': '5 consumer callback 1'}
{'data': '6 consumer callback 1'}
{'data': '7 consumer callback 1'}
{'data': '8 consumer callback 2'}
{'data': '9 consumer callback 2'}
-> % py multi_consumer_test.py 1 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
{'data': '0 consumer callback 0'}
callback 0: {'data': '4 consumer callback 0'}
{'data': '1 consumer callback 0'}
callback 0: {'data': '5 consumer callback 0'}
{'data': '2 consumer callback 0'}
callback 0: {'data': '6 consumer callback 0'}
{'data': '3 consumer callback 0'}
callback 0: {'data': '7 consumer callback 0'}
{'data': '4 consumer callback 0'}
callback 0: {'data': '8 consumer callback 0'}
{'data': '5 consumer callback 0'}
callback 0: {'data': '9 consumer callback 0'}
{'data': '6 consumer callback 0'}
{'data': '7 consumer callback 0'}
{'data': '8 consumer callback 0'}
{'data': '9 consumer callback 0'}
time processing 10 messages with 1 consumers: 2.043266773223877

What is the mistake in my setup?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

如痴如狂 2025-01-22 07:44:27

Pika 不是线程安全的,也不在内部使用任何线程。在这种情况下拥有多个使用者的正确方法是为每个连接启动一个Thread并在每个线程上进行消费。

Pika is not thread safe nor does it use any threading internally. The correct way to have multiple consumers in this case would be to start a Thread per-connection and consume on each of those threads.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文