rabbitmq pika连接错误
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
报以上错误
send.py
import pika
import sys
import time
import json
from pymongo import MongoClient
# 远程rabbitmq服务的配置信息
# 消息队列服务的连接和队列的创建
credentials = pika.PlainCredentials(username, pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, port_num, '/', credentials,heartbeat=0))
channel = connection.channel()
channel.queue_declare(queue='ws_interface', durable=True)
for x in co.find({"state": 0}).limit(100000):
print(count)
count += 1
channel.basic_publish(
exchange='',
routing_key='ws_interface', # 写明将消息发送给队列balance
body=pub_dict, # 要发送的消息
properties=pika.BasicProperties(delivery_mode=2, ) # 设置消息持久化(持久化第二步),将要发送的消息的属性标记为2,表示该消息要持久化
) # 向消息队列发送一条消息
print(" [%s] Sent 'message!'")
# time.sleep(0.2)
connection.close() # 关闭消息队列服务的连接
receieve.py
import pika
import json
import time
import os
credentials = pika.PlainCredentials(username, pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, port_num, '/', credentials, heartbeat=0))
connection.process_data_events()
channel = connection.channel()
channel.queue_declare(queue='ws_interface', durable=True)
# 消费成功的回调函数
def callback(ch, method, properties, body):
# 调用任务链,产生ES-log,便于统计和展示。这里mq的价值在于,可水平扩展,增加节点来增加消费速度
body = json.loads(body)
time.sleep(7)
print(" [%s] Received %r" % (time.time(), body))
# time.sleep(0.2)
# 具体逻辑
# 当工作者完成任务后,会反馈给rabbitmq
channel.basic_ack(delivery_tag=method.delivery_tag)
# 开始依次消费balance队列中的消息
channel.basic_consume(on_message_callback=callback, queue='ws_interface', auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 启动消费
简单的说就是send 10w个自增id ,消费打印出来,如果加上 time.sleep(7)
就会报错
这怎么解决
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
加了这个 不会报错了,有没有大佬帮忙分析一下问题出在哪儿