如何设置并行消费者?
我想并行处理具有多个消费者的队列。但我的方法行不通。无论我使用多少个消费者,它们似乎都会互相阻塞,并且处理输入队列总是花费相同的时间,无论消费者的数量有多少:
import argparse
import json
import time
import pika
def make_connection() -> pika.BlockingConnection:
credentials = pika.PlainCredentials(username="user", password="bitnami")
kwargs = {'host': 'localhost', 'port': 5672, 'credentials': credentials}
parameters = pika.ConnectionParameters(**kwargs)
connection = pika.BlockingConnection(parameters=parameters)
return connection
def publish(channel: pika.adapters.blocking_connection.BlockingChannel, data):
message = {"data": data}
message = json.dumps(message).encode()
channel.basic_publish("", "in", message)
def make_inp_callback(marker: str):
def inp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
time.sleep(.2)
message = {"data": json.loads(body)["data"] + " consumer " + marker}
print(f"{marker}: {message}")
message = json.dumps(message).encode()
channel.basic_publish("", "out", message)
channel.basic_ack(method.delivery_tag)
return inp_callback
def make_outp_callback(n):
def outp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
body = json.loads(body)
print(body)
channel.basic_ack(method.delivery_tag)
if body["data"].split(" ")[0] == str(n - 1):
channel.close()
return outp_callback
def set_up_consumers(channel, n):
for i in range(n):
marker = f"callback {i}"
cb = make_inp_callback(marker)
channel.basic_consume("in", cb)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("n_consumers", type=int)
parser.add_argument("n_messages", type=int)
args = parser.parse_args()
return args
def main():
args = parse_args()
connection = make_connection()
channel = connection.channel()
channel.basic_qos(prefetch_count=4)
channel.queue_declare("in")
channel.queue_declare("out")
for i in map(str, range(args.n_messages)):
publish(channel, i)
start = time.time()
set_up_consumers(channel, args.n_consumers)
channel.basic_consume("out", make_outp_callback(args.n_messages))
channel.start_consuming()
print(
f"time processing {args.n_messages} messages with {args.n_consumers} consumers: {time.time() - start}"
)
if __name__ == "__main__":
main()
-> % py multi_consumer_test.py 5 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
callback 1: {'data': '4 consumer callback 1'}
callback 1: {'data': '5 consumer callback 1'}
callback 1: {'data': '6 consumer callback 1'}
callback 1: {'data': '7 consumer callback 1'}
callback 2: {'data': '8 consumer callback 2'}
callback 2: {'data': '9 consumer callback 2'}
{'data': '0 consumer callback 0'}
{'data': '1 consumer callback 0'}
{'data': '2 consumer callback 0'}
{'data': '3 consumer callback 0'}
{'data': '4 consumer callback 1'}
{'data': '5 consumer callback 1'}
{'data': '6 consumer callback 1'}
{'data': '7 consumer callback 1'}
{'data': '8 consumer callback 2'}
{'data': '9 consumer callback 2'}
-> % py multi_consumer_test.py 1 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
{'data': '0 consumer callback 0'}
callback 0: {'data': '4 consumer callback 0'}
{'data': '1 consumer callback 0'}
callback 0: {'data': '5 consumer callback 0'}
{'data': '2 consumer callback 0'}
callback 0: {'data': '6 consumer callback 0'}
{'data': '3 consumer callback 0'}
callback 0: {'data': '7 consumer callback 0'}
{'data': '4 consumer callback 0'}
callback 0: {'data': '8 consumer callback 0'}
{'data': '5 consumer callback 0'}
callback 0: {'data': '9 consumer callback 0'}
{'data': '6 consumer callback 0'}
{'data': '7 consumer callback 0'}
{'data': '8 consumer callback 0'}
{'data': '9 consumer callback 0'}
time processing 10 messages with 1 consumers: 2.043266773223877
我的设置有什么错误?
I want to process a queue with multiple consumers in parallel. But my approach does not work. No matter how many consumers I use, they seem to block each other and processing the input queue always takes the same amount of time, regardless of the number of consumers:
import argparse
import json
import time
import pika
def make_connection() -> pika.BlockingConnection:
credentials = pika.PlainCredentials(username="user", password="bitnami")
kwargs = {'host': 'localhost', 'port': 5672, 'credentials': credentials}
parameters = pika.ConnectionParameters(**kwargs)
connection = pika.BlockingConnection(parameters=parameters)
return connection
def publish(channel: pika.adapters.blocking_connection.BlockingChannel, data):
message = {"data": data}
message = json.dumps(message).encode()
channel.basic_publish("", "in", message)
def make_inp_callback(marker: str):
def inp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
time.sleep(.2)
message = {"data": json.loads(body)["data"] + " consumer " + marker}
print(f"{marker}: {message}")
message = json.dumps(message).encode()
channel.basic_publish("", "out", message)
channel.basic_ack(method.delivery_tag)
return inp_callback
def make_outp_callback(n):
def outp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
body = json.loads(body)
print(body)
channel.basic_ack(method.delivery_tag)
if body["data"].split(" ")[0] == str(n - 1):
channel.close()
return outp_callback
def set_up_consumers(channel, n):
for i in range(n):
marker = f"callback {i}"
cb = make_inp_callback(marker)
channel.basic_consume("in", cb)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("n_consumers", type=int)
parser.add_argument("n_messages", type=int)
args = parser.parse_args()
return args
def main():
args = parse_args()
connection = make_connection()
channel = connection.channel()
channel.basic_qos(prefetch_count=4)
channel.queue_declare("in")
channel.queue_declare("out")
for i in map(str, range(args.n_messages)):
publish(channel, i)
start = time.time()
set_up_consumers(channel, args.n_consumers)
channel.basic_consume("out", make_outp_callback(args.n_messages))
channel.start_consuming()
print(
f"time processing {args.n_messages} messages with {args.n_consumers} consumers: {time.time() - start}"
)
if __name__ == "__main__":
main()
-> % py multi_consumer_test.py 5 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
callback 1: {'data': '4 consumer callback 1'}
callback 1: {'data': '5 consumer callback 1'}
callback 1: {'data': '6 consumer callback 1'}
callback 1: {'data': '7 consumer callback 1'}
callback 2: {'data': '8 consumer callback 2'}
callback 2: {'data': '9 consumer callback 2'}
{'data': '0 consumer callback 0'}
{'data': '1 consumer callback 0'}
{'data': '2 consumer callback 0'}
{'data': '3 consumer callback 0'}
{'data': '4 consumer callback 1'}
{'data': '5 consumer callback 1'}
{'data': '6 consumer callback 1'}
{'data': '7 consumer callback 1'}
{'data': '8 consumer callback 2'}
{'data': '9 consumer callback 2'}
-> % py multi_consumer_test.py 1 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
{'data': '0 consumer callback 0'}
callback 0: {'data': '4 consumer callback 0'}
{'data': '1 consumer callback 0'}
callback 0: {'data': '5 consumer callback 0'}
{'data': '2 consumer callback 0'}
callback 0: {'data': '6 consumer callback 0'}
{'data': '3 consumer callback 0'}
callback 0: {'data': '7 consumer callback 0'}
{'data': '4 consumer callback 0'}
callback 0: {'data': '8 consumer callback 0'}
{'data': '5 consumer callback 0'}
callback 0: {'data': '9 consumer callback 0'}
{'data': '6 consumer callback 0'}
{'data': '7 consumer callback 0'}
{'data': '8 consumer callback 0'}
{'data': '9 consumer callback 0'}
time processing 10 messages with 1 consumers: 2.043266773223877
What is the mistake in my setup?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
Pika 不是线程安全的,也不在内部使用任何线程。在这种情况下拥有多个使用者的正确方法是为每个连接启动一个
Thread
并在每个线程上进行消费。Pika is not thread safe nor does it use any threading internally. The correct way to have multiple consumers in this case would be to start a
Thread
per-connection and consume on each of those threads.