鼠兔在 celery 任务函数中发布
由于某些原因,我得到了一些使用 pika 在 celery 任务中发布消息的代码。 代码如下:
@app.task
async def test_celery():
with Rabbitmq_Helper() as connection:
channel = connection.channel()
channel.basic_qos(
prefetch_size=0, prefetch_count=0
)
exchange_name = 'test'
channel.exchange_declare(exchange=exchange_name, exchange_type='topic',
durable=True)
queue_name = "testtest"
properties = pika.BasicProperties(delivery_mode=2)
# body = msgpack.packb(clickhouse_utils.format_data(data, nodes), default=str)
for i in range(100):
body = "test data"
print("send msg")
channel.basic_publish(
exchange=exchange_name, routing_key=queue_name, body=body, properties=properties
)
在正式场景中,celery任务会延迟很多次。消费者函数将监视rabbitmq队列。
但在测试中,我发现消费者无法第一时间收到消息。并且芹菜可以发布队列中的消息。
所以我尝试调试,我像这样获取测试文件。但我发现芹菜任务延迟是正确的,但它无法发布队列中的消息。
发生了什么事?
in some reasons, I got some code using pika to publish message in a celery task.
code like this:
@app.task
async def test_celery():
with Rabbitmq_Helper() as connection:
channel = connection.channel()
channel.basic_qos(
prefetch_size=0, prefetch_count=0
)
exchange_name = 'test'
channel.exchange_declare(exchange=exchange_name, exchange_type='topic',
durable=True)
queue_name = "testtest"
properties = pika.BasicProperties(delivery_mode=2)
# body = msgpack.packb(clickhouse_utils.format_data(data, nodes), default=str)
for i in range(100):
body = "test data"
print("send msg")
channel.basic_publish(
exchange=exchange_name, routing_key=queue_name, body=body, properties=properties
)
In the formal scene, the celery task will delay many times. And a consumer func will watch the rabbitmq queue.
But in the test, I found the consumer can't receive the msg in the first time. And the celery can publish the msg in queue.
So I try to debug, I take the test file like this. But I found the celery task delay is right,but it can't publish the msg in queue.
what's happend?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论