RabbitMQ 和 python queue.Queue.get() 卡住
我试图将即将到来的数据复制到另一个queue.Queue()以在另一个线程中执行其他操作。
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)
这里的配置行
def start_rgb_consume_from_rabbitmq():
try:
# RABBITMQ PART #
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
# connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
rgb_channel = connection.channel()
rgb_channel.queue_declare(queue=RGB_QUEUE)
rgb_channel.queue_purge(queue=RGB_QUEUE)
rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
rgb_channel.start_consuming()
except Exception as err:
print("Exception :", err)
rgb_channel.stop_consuming()
except KeyboardInterrupt:
rgb_channel.stop_consuming()
sys.exit(0)
,最后是我失败的queue.Queue().get()函数:
def rgb_data_read_from_python_queue():
if STATUS2:
cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
print("POINTER 1")
try:
rgb_frame = READ_QUEUE.get(block=True)
except queue.Empty:
rgb_frame = None
if not rgb_frame:
continue
print("POINTER 2")
它卡在那里。我是线程和队列架构的新手。我正在尝试 add_callbak_threadsafe()
并且我知道 get() 会阻塞线程。但我创建了 2 个不同的线程,就像这里
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()
所以如果我创建了 2 个线程,为什么 queue.Queue().get() 会阻塞另一个线程。感谢您的帮助。我可以分享整个代码,它非常简单,几乎 170 行。
I am tring to copy the upcoming datas to another queue.Queue() to do other stuffs in another thread.
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)
and the config lines here
def start_rgb_consume_from_rabbitmq():
try:
# RABBITMQ PART #
connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
# connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
rgb_channel = connection.channel()
rgb_channel.queue_declare(queue=RGB_QUEUE)
rgb_channel.queue_purge(queue=RGB_QUEUE)
rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
rgb_channel.start_consuming()
except Exception as err:
print("Exception :", err)
rgb_channel.stop_consuming()
except KeyboardInterrupt:
rgb_channel.stop_consuming()
sys.exit(0)
and finally the queue.Queue().get() function that i failed at:
def rgb_data_read_from_python_queue():
if STATUS2:
cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
print("POINTER 1")
try:
rgb_frame = READ_QUEUE.get(block=True)
except queue.Empty:
rgb_frame = None
if not rgb_frame:
continue
print("POINTER 2")
it is stucking there. I am new at threading and queue architecture. I am triying add_callbak_threadsafe()
and I know that get() blocks the thread. But i created 2 different thread as here
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()
So if i created 2 threads why queue.Queue().get() blocks the other one. Thanks for your helps. I can share whole code it is really simple and almost 170 lines.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在这里我解决了这个问题,我想为那些试图将数据放入rabbitmq队列并由消费者读取然后将其放入python队列并在其他线程上做一些事情的人发布。我希望它能帮助别人。
Here I solved the issue and i want to publish for the ones who are trying to put data to rabbitmq queue and the read by consumer and then put it to python queue and do some stuffs on other threads. I hope it will help someones.