返回介绍

建议49:使用 Queue 使多线程编程更安全

发布于 2024-01-30 22:19:09 字数 5163 浏览 0 评论 0 收藏 0

曾经有这么一个说法,程序中存在3种类型的bug:你的bug、我的bug和多线程。这虽然是句调侃,但从某种程度上道出了一个事实:多线程编程不是件容易的事情。线程间的同步和互斥,线程间数据的共享等这些都是涉及线程安全要考虑的问题。纵然Python中提供了众多的同步和互斥机制,如mutex、condition、event等,但同步和互斥本身就不是一个容易的话题,稍有不慎就会陷入死锁状态或者威胁线程安全。我们来看一个经典的多线程同步问题:生产者消费者模型。如果用Python来实现,你会怎么写?大概思路是这样的:分别创建消费者和生产者线程,生产者往队列里面放产品,消费者从队列里面取出产品,创建一个线程锁以保证线程间操作的互斥性。当队列满的时候消费者进入等待状态,当队列空的时候生产者进入等待状态。我们来看一个具体的Python实现:

import Queue
import threading
import random
writelock = threading.Lock()                      #
创建锁对象用于控制输出
class Producer(threading.Thread):
     def __init__(self, q,con,name):
         super(Producer, self).__init__()
         self.q = q
         self.name = name
         self.con = con
         print "Producer "+self.name+" Started"
     def run(self):
         while 1:
              global writelock
              self.con.acquire()              #
获取锁对象
              if self.q.full():               #
队列满
                   with writelock:            #
输出信息
                       print 'Queue is full,producer wait!'
                   self.con.wait()            #
等待资源
              else:
                   value = random.randint(0,10)
                   with writelock:
                       print self.name +" put value 
                         "+self.name+":"+ str(value)+
                         "into queue"
              self.q.put((self.name+":"+str(value)))    #
放入队列中
              self.con.notify()               #
通知消费者
         self.con.release()                   #
释放锁对象
class Consumer(threading.Thread):                     #
消费者
     def __init__(self, q,con,name):
         super(Consumer, self).__init__()
         self.q = q
         self.con = con
         self.name = name
         print "Consumer "+self.name+" started\n "
     def run(self):
         while 1:
              global writelock
              self.con.acquire()
              if self.q.empty():              #
队列为空
                   with writelock:
                       print 'queue is empty,consumer wait!'
                   self.con.wait()            #
等待资源
              else:
                   value = self.q.get()         #
获取一个元素
                   with writelock:
                       print self.name +"get value"+ 
                         value + " from queue"
                   self.con.notify()           #
发送消息通知生产者
              self.con.release()               #
释放锁对象
if __name__ == "__main__":  
     q = Queue.Queue(10)
     con = threading.Condition()                   #
条件变量锁
     p = Producer(q,con,"P1")
     p.start()
     p1 = Producer(q,con,"P2")
     p1.start()
     c1 = Consumer(q,con,"C1")
     c1.start()

上面的程序实现有什么问题吗?回答这个问题之前,我们先来了解一下Queue模块的基本知识。Python中的Queue模块提供了3种队列:

Queue.Queue(maxsize):先进先出,maxsize为队列大小,其值为非正数的时候为无限循环队列。

Queue.LifoQueue(maxsize):后进先出,相当于栈。

Queue.PriorityQueue(maxsize):优先级队列。

这3种队列支持以下方法:

Queue.qsize():返回近似的队列大小。注意,这里之所以加“近似”二字,是因为当该值>0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。

Queue.empty():列队为空的时候返回True,否则返回False。

Queue.full():当设定了队列大小的情况下,如果队列满则返回True,否则返回False。

Queue.put(item[, block[, timeout]]):往队列中添加元素item,block设置为False的时候,如果队列满则抛出Full异常。如果block设置为True,timeout为None的时候则会一直等待直到有空位置,否则会根据timeout的设定超时后抛出Full异常。

Queue.put_nowait(item):等价于put(item, False).block设置为False的时候,如果队列空则抛出Empty异常。如果block设置为True、timeout为None的时候则会一直等待直到有元素可用,否则会根据timeout的设定超时后抛出Empty异常。

Queue.get([block[, timeout]]):从队列中删除元素并返回该元素的值。

Queue.get_nowait():等价于get(False)。

Queue.task_done():发送信号表明入列任务已经完成,经常在消费者线程中用到。

Queue.join():阻塞直至队列中所有的元素处理完毕。

Queue模块实现了多个生产者多个消费者的队列,当多线程之间需要信息安全的交换的时候特别有用,因此这个模块实现了所需要的锁原语,为Python多线程编程提供了有力的支持,它是线程安全的。需要注意的是Queue模块中的列队和collections.deque所表示的队列并不一样,前者主要用于不同线程之间的通信,它内部实现了线程的锁机制;而后者主要是数据结构上的概念,因此支持in方法。

再回过头来看看前面的例子,程序的实现有什么问题呢?答案很明显,作用于queue操作的条件变量完全是不需要的,因为queue本身能够保证线程安全,因此不需要额外的同步机制。那么,该如何修改呢?请读者自行思考。下面的多线程下载的例子也许有助于你完成上面程序的修改。

import os
import Queue
import threading
import urllib2
class DownloadThread(threading.Thread):
  def __init__(self, queue):
    threading.Thread.__init__(self)
    self.queue = queue
  def run(self):
    while True:
      url = self.queue.get()        # 
从队列中取出一个url
元素
      print self.name+"begin download"+url+"..."
      self.download_file(url)       # 
进行文件下载
      self.queue.task_done()        # 
下载完毕发送信号
      print self.name+" download completed!!!"
  def download_file(self, url):         #
下载文件
    urlhandler = urllib2.urlopen(url)
    fname = os.path.basename(url)+".html"   #
文件名称
    with open(fname, "wb") as f:      #
打开文件
      while True:
        chunk = urlhandler.read(1024)
        if not chunk: break
        f.write(chunk)
if __name__ == "__main__":
  urls = ["http://wiki.python.org/moin/WebProgramming",
      "https://www.createspace.com/3611970",
      "http://wiki.python.org/moin/Documentation"
    ]
  queue = Queue.Queue()
  # create a thread pool and give them a queue
  for i in range(5):
    t = DownloadThread(queue)         #
启动5
个线程同时进行下载
    t.setDaemon(True)
    t.start()
  # give the queue some data
  for url in urls:
    queue.put(url)
  # wait for the queue to finish
  queue.join()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文