RabbitMQ 和 python queue.Queue.get() 卡住

发布于 2025-01-11 04:17:00 字数 1748 浏览 0 评论 0原文

我试图将即将到来的数据复制到另一个queue.Queue()以在另一个线程中执行其他操作。

def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)

这里的配置行

def start_rgb_consume_from_rabbitmq():
try:

    #       RABBITMQ PART       #

    connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
    # connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
    rgb_channel = connection.channel()
    rgb_channel.queue_declare(queue=RGB_QUEUE)
    rgb_channel.queue_purge(queue=RGB_QUEUE)
    rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)

    rgb_channel.start_consuming()

except Exception as err:
    print("Exception :", err)
    rgb_channel.stop_consuming()

except KeyboardInterrupt:
    rgb_channel.stop_consuming()
    sys.exit(0)

,最后是我失败的queue.Queue().get()函数:

def rgb_data_read_from_python_queue():
if STATUS2:
    cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
    print("POINTER 1")
    try:
        rgb_frame = READ_QUEUE.get(block=True)
    except queue.Empty:
        rgb_frame = None

    if not rgb_frame:
        continue

    print("POINTER 2")

它卡在那里。我是线程和队列架构的新手。我正在尝试 add_callbak_threadsafe() 并且我知道 get() 会阻塞线程。但我创建了 2 个不同的线程,就像这里

rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()

所以如果我创建了 2 个线程,为什么 queue.Queue().get() 会阻塞另一个线程。感谢您的帮助。我可以分享整个代码,它非常简单,几乎 170 行。

I am tring to copy the upcoming datas to another queue.Queue() to do other stuffs in another thread.

def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)

and the config lines here

def start_rgb_consume_from_rabbitmq():
try:

    #       RABBITMQ PART       #

    connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
    # connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
    rgb_channel = connection.channel()
    rgb_channel.queue_declare(queue=RGB_QUEUE)
    rgb_channel.queue_purge(queue=RGB_QUEUE)
    rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)

    rgb_channel.start_consuming()

except Exception as err:
    print("Exception :", err)
    rgb_channel.stop_consuming()

except KeyboardInterrupt:
    rgb_channel.stop_consuming()
    sys.exit(0)

and finally the queue.Queue().get() function that i failed at:

def rgb_data_read_from_python_queue():
if STATUS2:
    cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
    print("POINTER 1")
    try:
        rgb_frame = READ_QUEUE.get(block=True)
    except queue.Empty:
        rgb_frame = None

    if not rgb_frame:
        continue

    print("POINTER 2")

it is stucking there. I am new at threading and queue architecture. I am triying add_callbak_threadsafe() and I know that get() blocks the thread. But i created 2 different thread as here

rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()

So if i created 2 threads why queue.Queue().get() blocks the other one. Thanks for your helps. I can share whole code it is really simple and almost 170 lines.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

一腔孤↑勇 2025-01-18 04:17:00

在这里我解决了这个问题,我想为那些试图将数据放入rabbitmq队列并由消费者读取然后将其放入python队列并在其他线程上做一些事情的人发布。我希望它能帮助别人。

#       RGB CONSUME     #

import numpy as np
import pika
import sys
import cv2
import queue
import threading

#       MACRO DEFINITIONS       #

RGB_QUEUE = 'RGBStream0'
WINDOW_TITLE = 'RGB Stream Consumer1'
WINDOW_TITLE2 = 'From Python Queue'
HOST_NAME = 'localhost'
READ_QUEUE = queue.Queue(200)
CONSUMER_THREAD_NAME = 'ConsumerThread'
THREAD_STOP_FLAG = False
TEST_FLAG = False


#       PARAMETER CHECK      #

# Status degiskeni ile, sadece dagitim yapilmasi yada dagitim ve displayin aynı anda yapilmasi durumu saklanmakta.
STATUS = None
STATUS2 = None
if len(sys.argv) > 1:
    if sys.argv[1] == '-display':
        STATUS = False
        STATUS2 = True
    else:
        print("Gecersiz parametre")
        exit(1)
else:
    pass


#       LOCAL FUNCTIONS     #

def rgb_callback(ch, method, properties, body):
    rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
    READ_QUEUE.put(item=rgb_color_bytes, block=True)
    print(rgb_color_bytes)


def start_rgb_consume_from_rabbitmq():
    try:

        #       RABBITMQ PART       #

        connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
        rgb_channel = connection.channel()
        rgb_channel.queue_declare(queue=RGB_QUEUE)
        rgb_channel.queue_purge(queue=RGB_QUEUE)
        rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
        if STATUS:
            cv2.namedWindow(WINDOW_TITLE, cv2.WINDOW_AUTOSIZE)

        if STATUS or STATUS2:
            print(' *** Mesajlar bekleniyor *** Goruntuleme acik *** Cikmak icin CTRL+C ***')
        else:
            print(' *** Mesajlar bekleniyor *** Goruntuleme icin -display *** Cikmak icin CTRL+C ***')

        rgb_channel.start_consuming()

    except Exception as err:
        print("Exception :", err)
        rgb_channel.stop_consuming()

    except KeyboardInterrupt:
        print('Interrupted ^^ Channel Kapatildi')
        rgb_channel.stop_consuming()
        sys.exit(0)


def rgb_data_read_from_python_queue():
    if STATUS2:
        cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)

    while True:
        rgb_frame = READ_QUEUE.get(block=True)
        # 640 * 480
        if rgb_frame.size == 921600:
            rgb_data_reshaped = np.reshape(rgb_frame, [480, 640, 3])
        # 1280 * 720
        elif rgb_frame.size == 2764800:
            rgb_data_reshaped = np.reshape(rgb_frame, [720, 1280, 3])
        # 1920 * 1080
        elif rgb_frame.size == 6220800:
            rgb_data_reshaped = np.reshape(rgb_frame, [1080, 1920, 3])
        else:
            print("Something wrong i can feel it")
            exit(1)

        if STATUS2:
            cv2.imshow(WINDOW_TITLE2, rgb_data_reshaped)
            cv2.waitKey(1)


try:
    rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
    consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
    rgb_data_thread.start()
    consumer_thread.start()
except KeyboardInterrupt:
    print('Interrupted')
    cv2.destroyAllWindows()
    sys.exit(0)

Here I solved the issue and i want to publish for the ones who are trying to put data to rabbitmq queue and the read by consumer and then put it to python queue and do some stuffs on other threads. I hope it will help someones.

#       RGB CONSUME     #

import numpy as np
import pika
import sys
import cv2
import queue
import threading

#       MACRO DEFINITIONS       #

RGB_QUEUE = 'RGBStream0'
WINDOW_TITLE = 'RGB Stream Consumer1'
WINDOW_TITLE2 = 'From Python Queue'
HOST_NAME = 'localhost'
READ_QUEUE = queue.Queue(200)
CONSUMER_THREAD_NAME = 'ConsumerThread'
THREAD_STOP_FLAG = False
TEST_FLAG = False


#       PARAMETER CHECK      #

# Status degiskeni ile, sadece dagitim yapilmasi yada dagitim ve displayin aynı anda yapilmasi durumu saklanmakta.
STATUS = None
STATUS2 = None
if len(sys.argv) > 1:
    if sys.argv[1] == '-display':
        STATUS = False
        STATUS2 = True
    else:
        print("Gecersiz parametre")
        exit(1)
else:
    pass


#       LOCAL FUNCTIONS     #

def rgb_callback(ch, method, properties, body):
    rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
    READ_QUEUE.put(item=rgb_color_bytes, block=True)
    print(rgb_color_bytes)


def start_rgb_consume_from_rabbitmq():
    try:

        #       RABBITMQ PART       #

        connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
        rgb_channel = connection.channel()
        rgb_channel.queue_declare(queue=RGB_QUEUE)
        rgb_channel.queue_purge(queue=RGB_QUEUE)
        rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
        if STATUS:
            cv2.namedWindow(WINDOW_TITLE, cv2.WINDOW_AUTOSIZE)

        if STATUS or STATUS2:
            print(' *** Mesajlar bekleniyor *** Goruntuleme acik *** Cikmak icin CTRL+C ***')
        else:
            print(' *** Mesajlar bekleniyor *** Goruntuleme icin -display *** Cikmak icin CTRL+C ***')

        rgb_channel.start_consuming()

    except Exception as err:
        print("Exception :", err)
        rgb_channel.stop_consuming()

    except KeyboardInterrupt:
        print('Interrupted ^^ Channel Kapatildi')
        rgb_channel.stop_consuming()
        sys.exit(0)


def rgb_data_read_from_python_queue():
    if STATUS2:
        cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)

    while True:
        rgb_frame = READ_QUEUE.get(block=True)
        # 640 * 480
        if rgb_frame.size == 921600:
            rgb_data_reshaped = np.reshape(rgb_frame, [480, 640, 3])
        # 1280 * 720
        elif rgb_frame.size == 2764800:
            rgb_data_reshaped = np.reshape(rgb_frame, [720, 1280, 3])
        # 1920 * 1080
        elif rgb_frame.size == 6220800:
            rgb_data_reshaped = np.reshape(rgb_frame, [1080, 1920, 3])
        else:
            print("Something wrong i can feel it")
            exit(1)

        if STATUS2:
            cv2.imshow(WINDOW_TITLE2, rgb_data_reshaped)
            cv2.waitKey(1)


try:
    rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
    consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
    rgb_data_thread.start()
    consumer_thread.start()
except KeyboardInterrupt:
    print('Interrupted')
    cv2.destroyAllWindows()
    sys.exit(0)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文