python的queue模块会不会被锁死

发布于 2022-09-03 01:14:32 字数 1792 浏览 24 评论 0

我把抓取的数据通过rpc发送给调度里面的queue,调度里面的有几个线程消费该queue,
刚开始还可以,几个小时后就卡死了,发送给rpc的进程也卡住了,不抓数据了,而queue是满的,却不消费

rpc.py
import rpyc
from scheduler import Scheduler
import time
import queue

scheduler = Scheduler()


class Service(rpyc.Service):
    def exposed_add_task(self, task):
        try:
            scheduler.add_task(task)
        except queue.Full:
            time.sleep(1)


if __name__ == '__main__':
    import threading
    import rpyc.utils.server

    print('scheduler startup')
    scheduler.run()

    print('rpc server startup')
    server = rpyc.utils.server.ThreadedServer(Service, port=50000)
    thread = threading.Thread(target=server.start)
    thread.start()
    try:
        thread.join()
    except KeyboardInterrupt:
        scheduler.stop()
    server.close()
    print('rpc server shutdown')
scheduler.py
class Scheduler:
    def __init__(self, worker_num=8):
        self.worker_num = worker_num
        self.task_queue = queue.Queue(500)
        self._stop = False

    def _work(self):
        while not self._stop:
            try:
                print('queue size', self.task_queue.qsize())
                typ, *data = self.task_queue.get()
            except queue.Empty:
                time.sleep(1)
                continue
            except:
                pass

            self.task_queue.task_done()

    def _init_workers(self):
        for _ in range(self.worker_num):
            t = threading.Thread(target=self._work)
            t.deamon = True
            t.start()

    def add_task(self, task):
        self.task_queue.put(task)

    def run(self):
        self._init_workers()

    def stop(self):
        self._stop = True

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

苦笑流年记忆 2022-09-10 01:14:32

理论上QUEUE不可能死锁,因为你读写的时候自动加Q锁,在占有Q锁的期间不可能再申请其它锁
从这方面来讲,QUEUE对使用者来说是原子的

如果真的存在死锁,那必然是其它锁交叉占用死的,与Q锁无关

要看为何阻塞,先看看阻塞在哪里

建议

另外建议对QUEUE的get和put最好是非阻塞,或者设置一个超时,
超时后可以打印异常信息,应该避免永久阻塞

考虑一种可能的情况

生产者A,消费者B

A生产前先占用锁X,然后PUT(但是队列满了所以阻塞)
B消费前申请锁X,然后GET——但是X申请不到了!被A永久占用

应该避免这种情况,方法是:
1、队列操作不要长时间阻塞,(用非阻塞或设置短超时)
2、避免队列操作时占用其它锁

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文