- Preface 前言
- 第1章 引论
- 第2章 编程惯用法
- 第3章 基础语法
- 建议19:有节制地使用 from…import 语句
- 建议20:优先使用 absolute import 来导入模块
- 建议21:i+=1 不等于 ++i
- 建议22:使用 with 自动关闭资源
- 建议23:使用 else 子句简化循环(异常处理)
- 建议24:遵循异常处理的几点基本原则
- 建议25:避免 finally 中可能发生的陷阱
- 建议26:深入理解 None 正确判断对象是否为空
- 建议27:连接字符串应优先使用 join 而不是 +
- 建议28:格式化字符串时尽量使用 .format 方式而不是 %
- 建议29:区别对待可变对象和不可变对象
- 建议30:[]、() 和 {}:一致的容器初始化形式
- 建议31:记住函数传参既不是传值也不是传引用
- 建议32:警惕默认参数潜在的问题
- 建议33:慎用变长参数
- 建议34:深入理解 str() 和 repr() 的区别
- 建议35:分清 staticmethod 和 classmethod 的适用场景
- 第4章 库
- 建议36:掌握字符串的基本用法
- 建议37:按需选择 sort() 或者 sorted()
- 建议38:使用 copy 模块深拷贝对象
- 建议39:使用 Counter 进行计数统计
- 建议40:深入掌握 ConfigParser
- 建议41:使用 argparse 处理命令行参数
- 建议42:使用 pandas 处理大型 CSV 文件
- 建议43:一般情况使用 ElementTree 解析 XML
- 建议44:理解模块 pickle 优劣
- 建议45:序列化的另一个不错的选择 JSON
- 建议46:使用 traceback 获取栈信息
- 建议47:使用 logging 记录日志信息
- 建议48:使用 threading 模块编写多线程程序
- 建议49:使用 Queue 使多线程编程更安全
- 第5章 设计模式
- 第6章 内部机制
- 建议54:理解 built-in objects
- 建议55:init() 不是构造方法
- 建议56:理解名字查找机制
- 建议57:为什么需要 self 参数
- 建议58:理解 MRO 与多继承
- 建议59:理解描述符机制
- 建议60:区别 getattr() 和 getattribute() 方法
- 建议61:使用更为安全的 property
- 建议62:掌握 metaclass
- 建议63:熟悉 Python 对象协议
- 建议64:利用操作符重载实现中缀语法
- 建议65:熟悉 Python 的迭代器协议
- 建议66:熟悉 Python 的生成器
- 建议67:基于生成器的协程及 greenlet
- 建议68:理解 GIL 的局限性
- 建议69:对象的管理与垃圾回收
- 第7章 使用工具辅助项目开发
- 第8章 性能剖析与优化
建议49:使用 Queue 使多线程编程更安全
曾经有这么一个说法,程序中存在3种类型的bug:你的bug、我的bug和多线程。这虽然是句调侃,但从某种程度上道出了一个事实:多线程编程不是件容易的事情。线程间的同步和互斥,线程间数据的共享等这些都是涉及线程安全要考虑的问题。纵然Python中提供了众多的同步和互斥机制,如mutex、condition、event等,但同步和互斥本身就不是一个容易的话题,稍有不慎就会陷入死锁状态或者威胁线程安全。我们来看一个经典的多线程同步问题:生产者消费者模型。如果用Python来实现,你会怎么写?大概思路是这样的:分别创建消费者和生产者线程,生产者往队列里面放产品,消费者从队列里面取出产品,创建一个线程锁以保证线程间操作的互斥性。当队列满的时候消费者进入等待状态,当队列空的时候生产者进入等待状态。我们来看一个具体的Python实现:
import Queue import threading import random writelock = threading.Lock() # 创建锁对象用于控制输出 class Producer(threading.Thread): def __init__(self, q,con,name): super(Producer, self).__init__() self.q = q self.name = name self.con = con print "Producer "+self.name+" Started" def run(self): while 1: global writelock self.con.acquire() # 获取锁对象 if self.q.full(): # 队列满 with writelock: # 输出信息 print 'Queue is full,producer wait!' self.con.wait() # 等待资源 else: value = random.randint(0,10) with writelock: print self.name +" put value "+self.name+":"+ str(value)+ "into queue" self.q.put((self.name+":"+str(value))) # 放入队列中 self.con.notify() # 通知消费者 self.con.release() # 释放锁对象 class Consumer(threading.Thread): # 消费者 def __init__(self, q,con,name): super(Consumer, self).__init__() self.q = q self.con = con self.name = name print "Consumer "+self.name+" started\n " def run(self): while 1: global writelock self.con.acquire() if self.q.empty(): # 队列为空 with writelock: print 'queue is empty,consumer wait!' self.con.wait() # 等待资源 else: value = self.q.get() # 获取一个元素 with writelock: print self.name +"get value"+ value + " from queue" self.con.notify() # 发送消息通知生产者 self.con.release() # 释放锁对象 if __name__ == "__main__": q = Queue.Queue(10) con = threading.Condition() # 条件变量锁 p = Producer(q,con,"P1") p.start() p1 = Producer(q,con,"P2") p1.start() c1 = Consumer(q,con,"C1") c1.start()
上面的程序实现有什么问题吗?回答这个问题之前,我们先来了解一下Queue模块的基本知识。Python中的Queue模块提供了3种队列:
Queue.Queue(maxsize):先进先出,maxsize为队列大小,其值为非正数的时候为无限循环队列。
Queue.LifoQueue(maxsize):后进先出,相当于栈。
Queue.PriorityQueue(maxsize):优先级队列。
这3种队列支持以下方法:
Queue.qsize():返回近似的队列大小。注意,这里之所以加“近似”二字,是因为当该值>0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。
Queue.empty():列队为空的时候返回True,否则返回False。
Queue.full():当设定了队列大小的情况下,如果队列满则返回True,否则返回False。
Queue.put(item[, block[, timeout]]):往队列中添加元素item,block设置为False的时候,如果队列满则抛出Full异常。如果block设置为True,timeout为None的时候则会一直等待直到有空位置,否则会根据timeout的设定超时后抛出Full异常。
Queue.put_nowait(item):等价于put(item, False).block设置为False的时候,如果队列空则抛出Empty异常。如果block设置为True、timeout为None的时候则会一直等待直到有元素可用,否则会根据timeout的设定超时后抛出Empty异常。
Queue.get([block[, timeout]]):从队列中删除元素并返回该元素的值。
Queue.get_nowait():等价于get(False)。
Queue.task_done():发送信号表明入列任务已经完成,经常在消费者线程中用到。
Queue.join():阻塞直至队列中所有的元素处理完毕。
Queue模块实现了多个生产者多个消费者的队列,当多线程之间需要信息安全的交换的时候特别有用,因此这个模块实现了所需要的锁原语,为Python多线程编程提供了有力的支持,它是线程安全的。需要注意的是Queue模块中的列队和collections.deque所表示的队列并不一样,前者主要用于不同线程之间的通信,它内部实现了线程的锁机制;而后者主要是数据结构上的概念,因此支持in方法。
再回过头来看看前面的例子,程序的实现有什么问题呢?答案很明显,作用于queue操作的条件变量完全是不需要的,因为queue本身能够保证线程安全,因此不需要额外的同步机制。那么,该如何修改呢?请读者自行思考。下面的多线程下载的例子也许有助于你完成上面程序的修改。
import os import Queue import threading import urllib2 class DownloadThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: url = self.queue.get() # 从队列中取出一个url 元素 print self.name+"begin download"+url+"..." self.download_file(url) # 进行文件下载 self.queue.task_done() # 下载完毕发送信号 print self.name+" download completed!!!" def download_file(self, url): # 下载文件 urlhandler = urllib2.urlopen(url) fname = os.path.basename(url)+".html" # 文件名称 with open(fname, "wb") as f: # 打开文件 while True: chunk = urlhandler.read(1024) if not chunk: break f.write(chunk) if __name__ == "__main__": urls = ["http://wiki.python.org/moin/WebProgramming", "https://www.createspace.com/3611970", "http://wiki.python.org/moin/Documentation" ] queue = Queue.Queue() # create a thread pool and give them a queue for i in range(5): t = DownloadThread(queue) # 启动5 个线程同时进行下载 t.setDaemon(True) t.start() # give the queue some data for url in urls: queue.put(url) # wait for the queue to finish queue.join()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论