我的 HelloWorld 队列正常吗?

发布于 2024-12-01 03:00:35 字数 1378 浏览 3 评论 0原文

我即将在应用程序中使用此设计,但我对 python 中的线程和队列内容相当陌生。显然,实际的应用程序不是为了打招呼,但设计是相同的 - 即有一个过程需要一些时间来设置和拆除,但我可以一次完成多个任务。任务会随机到达,并且通常是突发的。

这是一个明智且线程安全的设计吗?

class HelloThing(object):

  def __init__(self):
    self.queue = self._create_worker()

  def _create_worker(self):
    import threading, Queue

    def worker():
      while True:
        things = [q.get()]
        while True:
          try:
            things.append(q.get_nowait())
          except Queue.Empty:
            break
        self._say_hello(things)
        [q.task_done() for task in xrange(len(things))]

    q = Queue.Queue()
    n_worker_threads = 1
    for i in xrange(n_worker_threads):
      t = threading.Thread(target=worker)
      t.daemon = True
      t.start()

    return q

  def _say_hello(self, greeting_list):
    import time, sys
    # setup stuff
    time.sleep(1)
    # do some things
    sys.stdout.write('hello {0}!\n'.format(', '.join(greeting_list)))
    # tear down stuff
    time.sleep(1)


if __name__ == '__main__':
  print 'enter __main__'

  import time
  hello = HelloThing()

  hello.queue.put('world')
  hello.queue.put('cruel world')
  hello.queue.put('stack overflow')

  time.sleep(2)

  hello.queue.put('a')
  hello.queue.put('b')

  time.sleep(2)

  for i in xrange(20):
    hello.queue.put(str(i))

  #hello.queue.join()

  print 'finish __main__'

I'm about to put this design into use in an application, but I'm fairly new to threading and Queue stuff in python. Obviously the actual application is not for saying hello, but the design is the same - i.e. there is a process which takes some time to set-up and tear down, but I can do multiple tasks in one hit. Tasks will arrive at random times, and often in bursts.

Is this a sensible and thread safe design?

class HelloThing(object):

  def __init__(self):
    self.queue = self._create_worker()

  def _create_worker(self):
    import threading, Queue

    def worker():
      while True:
        things = [q.get()]
        while True:
          try:
            things.append(q.get_nowait())
          except Queue.Empty:
            break
        self._say_hello(things)
        [q.task_done() for task in xrange(len(things))]

    q = Queue.Queue()
    n_worker_threads = 1
    for i in xrange(n_worker_threads):
      t = threading.Thread(target=worker)
      t.daemon = True
      t.start()

    return q

  def _say_hello(self, greeting_list):
    import time, sys
    # setup stuff
    time.sleep(1)
    # do some things
    sys.stdout.write('hello {0}!\n'.format(', '.join(greeting_list)))
    # tear down stuff
    time.sleep(1)


if __name__ == '__main__':
  print 'enter __main__'

  import time
  hello = HelloThing()

  hello.queue.put('world')
  hello.queue.put('cruel world')
  hello.queue.put('stack overflow')

  time.sleep(2)

  hello.queue.put('a')
  hello.queue.put('b')

  time.sleep(2)

  for i in xrange(20):
    hello.queue.put(str(i))

  #hello.queue.join()

  print 'finish __main__'

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

诗笺 2024-12-08 03:00:35
  1. 线程安全由 Queue 实现处理(如果需要,您也必须在 _say_hello 实现中处理)。

  2. 突发处理程序问题:突发应该仅由单个线程处理。(例如:假设您的进程设置/拆卸需要 10 秒;在第 1 秒,所有线程将从第 0 秒开始忙于突发,在第 5 秒新任务(或突发),但没有可用的线程来处理它们/它)。因此,突发应该由特定时间窗口的最大任务数(或者可能是“无限”)来定义。队列中的条目应该是任务列表。

如何对突发任务列表进行分组?
我以代码的形式提供了一个解决方案,更容易解释...

producer_q = Queue()
def _burst_thread():
   while True:
      available_tasks = [producer_q.get()]
      time.sleep(BURST_TIME_WINDOW)
      available_tasks.extend(producer_q.get() # I'm the single consumer, so will be at least qsize elements  
                             for i in range(producer_q.qsize()))
      consumer_q.push(available_tasks)

如果您想突发最多的消息,您只需将 available_tasks 切片到多个列表中即可。

  1. The thread safety is handled by Queue implementation (also you must handle in your _say_hello implementation if it is required).

  2. Burst handler problem: A burst should be handled by a single thread only.(ex: let's say your process setup/teardown takes 10 seconds; at second 1 all threads will be busy with burst from sec 0, on second 5 a new task(or burst) but no thread available to handle them/it). So a burst should be defined by max number of tasks (or maybe "infinite") for a specific time-window. An entry in queue should be a list of tasks.

How can you group burst tasks list?
I provide a solution as code, more easy to explain ...

producer_q = Queue()
def _burst_thread():
   while True:
      available_tasks = [producer_q.get()]
      time.sleep(BURST_TIME_WINDOW)
      available_tasks.extend(producer_q.get() # I'm the single consumer, so will be at least qsize elements  
                             for i in range(producer_q.qsize()))
      consumer_q.push(available_tasks)

If you want to have a maximum of messages in a burst, you just need to slice the available_tasks in multiple lists.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文