从Job_quee执行长期运行任务时,频道是关闭的错误

发布于 2025-02-14 00:55:56 字数 1327 浏览 0 评论 0原文

我有一个在Python中实现的Job_queue,该Quequeue已订阅为RabbitMQ队列。 RabbitMQ队列定期从服务器更新。没有关于兔子何时填充的固定时间表。当这种情况发生时,业务逻辑是执行一项长期运行的任务,这些任务可能在5分钟之间,最多5小时。

在我的Job_queue实现中,我将最大大小设置为1,而连接的预摘要计数为1。这是一次只允许一次从RabbitMQ中获取一个事件。目的是挑选一个事件,执行它,然后继续进行另一个事件。我还将连接的心跳设置设置为0,我认为这意味着没有超时。但是我注意到我会关闭一个频道。错误当兔子中有1个以上的事件并终止我的代码块。对于如何在没有RabbitMQ超时或连接超时失败的情况下,如何最好地完成我的长期任务。

我的代码块代表工作队列和回调的小摘录如下:

class从: def init (self): self.job_queue = queue.queue(maxsize = 1) self.job_item =“”

def push_to_job_queue(self, obj):
    print("Inside push_to_job_queue")
    try:
        self.job_item = obj
        self.job_queue.put(self.job_item)
        threading.Thread(target=self.get_from_job_queue, daemon=True).start()
        print("JobQueue being joined")
        self.job_queue.join()
    except Exception as e:
        self.job_queue.join()
        print("Failure in slaveConsumer.py report: {}".format(e), "error")

def get_from_job_queue(self):
    # def _worker():
    print("Inside get_from_job_queue")
    if self.job_queue.empty() and self.job_queue.qsize() == 0:
        print("Queue is Empty do nothing")
    else:
        json_body = self.job_queue.get()
        # This is a long running task
        self._parse_object_from_queue(json_body)
        self.job_queue.task_done()

I have a job_queue implemented in python which is subscribed on to rabbitmq queue. The rabbitmq queue is updated from a server periodically. No fixed timelines as to when the rabbitmq would be populated. When this happens, the business logic is to run a long-running task where these tasks could be anywhere between 5 minutes, up to 5 hours.

In my job_queue implementation, I have set the max size to be 1 and the prefetch count for the connection to be 1. This is to allow only one event from the rabbitmq to be picked up at a time. The intention is to pick up one event, execute it and then proceed to another. I have also set the heartbeat setting of the connection as 0, which I believe implies don't timeout. Yet I notice that I get a Channel is closed. error when there is more than 1 event in the rabbitmq and it terminates my code block. Would appreciate pointers as to how best to run my long-running task without the rabbitmq timeouts or connection timeout failures.

A small excerpt from my code-block representing the job queue and the callbacks is as follows:

class slave:
def init(self):
self.job_queue = queue.Queue(maxsize=1)
self.job_item = ""

def push_to_job_queue(self, obj):
    print("Inside push_to_job_queue")
    try:
        self.job_item = obj
        self.job_queue.put(self.job_item)
        threading.Thread(target=self.get_from_job_queue, daemon=True).start()
        print("JobQueue being joined")
        self.job_queue.join()
    except Exception as e:
        self.job_queue.join()
        print("Failure in slaveConsumer.py report: {}".format(e), "error")

def get_from_job_queue(self):
    # def _worker():
    print("Inside get_from_job_queue")
    if self.job_queue.empty() and self.job_queue.qsize() == 0:
        print("Queue is Empty do nothing")
    else:
        json_body = self.job_queue.get()
        # This is a long running task
        self._parse_object_from_queue(json_body)
        self.job_queue.task_done()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文