python的queue模块会不会被锁死
我把抓取的数据通过rpc发送给调度里面的queue,调度里面的有几个线程消费该queue,
刚开始还可以,几个小时后就卡死了,发送给rpc的进程也卡住了,不抓数据了,而queue是满的,却不消费
rpc.py
import rpyc
from scheduler import Scheduler
import time
import queue
scheduler = Scheduler()
class Service(rpyc.Service):
def exposed_add_task(self, task):
try:
scheduler.add_task(task)
except queue.Full:
time.sleep(1)
if __name__ == '__main__':
import threading
import rpyc.utils.server
print('scheduler startup')
scheduler.run()
print('rpc server startup')
server = rpyc.utils.server.ThreadedServer(Service, port=50000)
thread = threading.Thread(target=server.start)
thread.start()
try:
thread.join()
except KeyboardInterrupt:
scheduler.stop()
server.close()
print('rpc server shutdown')
scheduler.py
class Scheduler:
def __init__(self, worker_num=8):
self.worker_num = worker_num
self.task_queue = queue.Queue(500)
self._stop = False
def _work(self):
while not self._stop:
try:
print('queue size', self.task_queue.qsize())
typ, *data = self.task_queue.get()
except queue.Empty:
time.sleep(1)
continue
except:
pass
self.task_queue.task_done()
def _init_workers(self):
for _ in range(self.worker_num):
t = threading.Thread(target=self._work)
t.deamon = True
t.start()
def add_task(self, task):
self.task_queue.put(task)
def run(self):
self._init_workers()
def stop(self):
self._stop = True
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
理论上QUEUE不可能死锁,因为你读写的时候自动加Q锁,在占有Q锁的期间不可能再申请其它锁
从这方面来讲,QUEUE对使用者来说是原子的
要看为何阻塞,先看看阻塞在哪里
另外建议对QUEUE的get和put最好是非阻塞,或者设置一个超时,
超时后可以打印异常信息,应该避免永久阻塞
生产者A,消费者B
A生产前先占用锁X,然后PUT(但是队列满了所以阻塞)
B消费前申请锁X,然后GET——但是X申请不到了!被A永久占用
应该避免这种情况,方法是:
1、队列操作不要长时间阻塞,(用非阻塞或设置短超时)
2、避免队列操作时占用其它锁