从Job_quee执行长期运行任务时,频道是关闭的错误
我有一个在Python中实现的Job_queue,该Quequeue已订阅为RabbitMQ队列。 RabbitMQ队列定期从服务器更新。没有关于兔子何时填充的固定时间表。当这种情况发生时,业务逻辑是执行一项长期运行的任务,这些任务可能在5分钟之间,最多5小时。
在我的Job_queue实现中,我将最大大小设置为1,而连接的预摘要计数为1。这是一次只允许一次从RabbitMQ中获取一个事件。目的是挑选一个事件,执行它,然后继续进行另一个事件。我还将连接的心跳设置设置为0,我认为这意味着没有超时。但是我注意到我会关闭一个频道。错误当兔子中有1个以上的事件并终止我的代码块。对于如何在没有RabbitMQ超时或连接超时失败的情况下,如何最好地完成我的长期任务。
我的代码块代表工作队列和回调的小摘录如下:
class从: def init (self): self.job_queue = queue.queue(maxsize = 1) self.job_item =“”
def push_to_job_queue(self, obj):
print("Inside push_to_job_queue")
try:
self.job_item = obj
self.job_queue.put(self.job_item)
threading.Thread(target=self.get_from_job_queue, daemon=True).start()
print("JobQueue being joined")
self.job_queue.join()
except Exception as e:
self.job_queue.join()
print("Failure in slaveConsumer.py report: {}".format(e), "error")
def get_from_job_queue(self):
# def _worker():
print("Inside get_from_job_queue")
if self.job_queue.empty() and self.job_queue.qsize() == 0:
print("Queue is Empty do nothing")
else:
json_body = self.job_queue.get()
# This is a long running task
self._parse_object_from_queue(json_body)
self.job_queue.task_done()
I have a job_queue implemented in python which is subscribed on to rabbitmq queue. The rabbitmq queue is updated from a server periodically. No fixed timelines as to when the rabbitmq would be populated. When this happens, the business logic is to run a long-running task where these tasks could be anywhere between 5 minutes, up to 5 hours.
In my job_queue implementation, I have set the max size to be 1 and the prefetch count for the connection to be 1. This is to allow only one event from the rabbitmq to be picked up at a time. The intention is to pick up one event, execute it and then proceed to another. I have also set the heartbeat setting of the connection as 0, which I believe implies don't timeout. Yet I notice that I get a Channel is closed. error when there is more than 1 event in the rabbitmq and it terminates my code block. Would appreciate pointers as to how best to run my long-running task without the rabbitmq timeouts or connection timeout failures.
A small excerpt from my code-block representing the job queue and the callbacks is as follows:
class slave:
def init(self):
self.job_queue = queue.Queue(maxsize=1)
self.job_item = ""
def push_to_job_queue(self, obj):
print("Inside push_to_job_queue")
try:
self.job_item = obj
self.job_queue.put(self.job_item)
threading.Thread(target=self.get_from_job_queue, daemon=True).start()
print("JobQueue being joined")
self.job_queue.join()
except Exception as e:
self.job_queue.join()
print("Failure in slaveConsumer.py report: {}".format(e), "error")
def get_from_job_queue(self):
# def _worker():
print("Inside get_from_job_queue")
if self.job_queue.empty() and self.job_queue.qsize() == 0:
print("Queue is Empty do nothing")
else:
json_body = self.job_queue.get()
# This is a long running task
self._parse_object_from_queue(json_body)
self.job_queue.task_done()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论