Python队列在获取下一个项目之前等待线程

发布于 2024-11-08 02:50:50 字数 1750 浏览 0 评论 0原文

我有一个队列,当项目添加到队列中时,它总是需要准备好处理它们。在队列中的每个项目上运行的函数创建并启动线程在后台执行操作,以便程序可以执行其他操作。

但是,我对队列中的每个项目调用的函数只是启动线程,然后完成执行,无论它启动的线程是否完成。因此,在程序处理完最后一项之前,循环将移至队列中的下一项。

下面的代码可以更好地演示我想要做的事情:

queue = Queue.Queue()
t = threading.Thread(target=worker)
t.start()

def addTask():
    queue.put(SomeObject())

def worker():
    while True:
        try:
            # If an item is put onto the queue, immediately execute it (unless 
            # an item on the queue is still being processed, in which case wait 
            # for it to complete before moving on to the next item in the queue)
            item = queue.get()
            runTests(item)
            # I want to wait for 'runTests' to complete before moving past this point
        except Queue.Empty, err:
            # If the queue is empty, just keep running the loop until something 
            # is put on top of it.
            pass

def runTests(args):
    op_thread = SomeThread(args)
    op_thread.start()
    # My problem is once this last line 't.start()' starts the thread, 
    # the 'runTests' function completes operation, but the operation executed
    # by some thread is not yet done executing because it is still running in
    # the background. I do not want the 'runTests' function to actually complete
    # execution until the operation in thread t is done executing.
    """t.join()"""
    # I tried putting this line after 't.start()', but that did not solve anything.
    # I have commented it out because it is not necessary to demonstrate what 
    # I am trying to do, but I just wanted to show that I tried it.

一些注释:

这一切都在 PyGTK 应用程序中运行。一旦“SomeThread”操作完成,它就会向 GUI 发送回调以显示操作结果。

我不知道这对我遇到的问题有多大影响,但我认为这可能很重要。

I have a queue that always needs to be ready to process items when they are added to it. The function that runs on each item in the queue creates and starts thread to execute the operation in the background so the program can go do other things.

However, the function I am calling on each item in the queue simply starts the thread and then completes execution, regardless of whether or not the thread it started completed. Because of this, the loop will move on to the next item in the queue before the program is done processing the last item.

Here is code to better demonstrate what I am trying to do:

queue = Queue.Queue()
t = threading.Thread(target=worker)
t.start()

def addTask():
    queue.put(SomeObject())

def worker():
    while True:
        try:
            # If an item is put onto the queue, immediately execute it (unless 
            # an item on the queue is still being processed, in which case wait 
            # for it to complete before moving on to the next item in the queue)
            item = queue.get()
            runTests(item)
            # I want to wait for 'runTests' to complete before moving past this point
        except Queue.Empty, err:
            # If the queue is empty, just keep running the loop until something 
            # is put on top of it.
            pass

def runTests(args):
    op_thread = SomeThread(args)
    op_thread.start()
    # My problem is once this last line 't.start()' starts the thread, 
    # the 'runTests' function completes operation, but the operation executed
    # by some thread is not yet done executing because it is still running in
    # the background. I do not want the 'runTests' function to actually complete
    # execution until the operation in thread t is done executing.
    """t.join()"""
    # I tried putting this line after 't.start()', but that did not solve anything.
    # I have commented it out because it is not necessary to demonstrate what 
    # I am trying to do, but I just wanted to show that I tried it.

Some notes:

This is all running in a PyGTK application. Once the 'SomeThread' operation is complete, it sends a callback to the GUI to display the results of the operation.

I do not know how much this affects the issue I am having, but I thought it might be important.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

层林尽染 2024-11-15 02:50:50

Python 线程的一个基本问题是你不能直接杀死它们——它们必须同意死亡

您应该做的是:

  1. 将线程实现为类
  2. 添加一个 threading.Event 成员,join 方法会清除该成员,并且线程的主循环偶尔会检查该成员。如果它发现它被清除,它就会返回。对于此重写,threading.Thread.join 检查事件,然后对其自身调用 Thread.join
  3. 要允许 (2),请从 Queue 进行读取code> 块有一些小的超时。这样你的线程对终止请求的“响应时间”将是超时,并且不会发生 CPU 阻塞

这是来自我的套接字客户端线程的一些代码,它与队列阻塞有相同的问题:

class SocketClientThread(threading.Thread):
    """ Implements the threading.Thread interface (start, join, etc.) and
        can be controlled via the cmd_q Queue attribute. Replies are placed in
        the reply_q Queue attribute.
    """
    def __init__(self, cmd_q=Queue.Queue(), reply_q=Queue.Queue()):
        super(SocketClientThread, self).__init__()
        self.cmd_q = cmd_q
        self.reply_q = reply_q
        self.alive = threading.Event()
        self.alive.set()
        self.socket = None

        self.handlers = {
            ClientCommand.CONNECT: self._handle_CONNECT,
            ClientCommand.CLOSE: self._handle_CLOSE,
            ClientCommand.SEND: self._handle_SEND,
            ClientCommand.RECEIVE: self._handle_RECEIVE,
        }

    def run(self):
        while self.alive.isSet():
            try:
                # Queue.get with timeout to allow checking self.alive
                cmd = self.cmd_q.get(True, 0.1)
                self.handlers[cmd.type](cmd)
            except Queue.Empty as e:
                continue

    def join(self, timeout=None):
        self.alive.clear()
        threading.Thread.join(self, timeout)

Note self .aliverun 中的循环。

A fundamental issue with Python threads is that you can't just kill them - they have to agree to die.

What you should do is:

  1. Implement the thread as a class
  2. Add a threading.Event member which the join method clears and the thread's main loop occasionally checks. If it sees it's cleared, it returns. For this override threading.Thread.join to check the event and then call Thread.join on itself
  3. To allow (2), make the read from Queue block with some small timeout. This way your thread's "response time" to the kill request will be the timeout, and OTOH no CPU choking is done

Here's some code from a socket client thread I have that has the same issue with blocking on a queue:

class SocketClientThread(threading.Thread):
    """ Implements the threading.Thread interface (start, join, etc.) and
        can be controlled via the cmd_q Queue attribute. Replies are placed in
        the reply_q Queue attribute.
    """
    def __init__(self, cmd_q=Queue.Queue(), reply_q=Queue.Queue()):
        super(SocketClientThread, self).__init__()
        self.cmd_q = cmd_q
        self.reply_q = reply_q
        self.alive = threading.Event()
        self.alive.set()
        self.socket = None

        self.handlers = {
            ClientCommand.CONNECT: self._handle_CONNECT,
            ClientCommand.CLOSE: self._handle_CLOSE,
            ClientCommand.SEND: self._handle_SEND,
            ClientCommand.RECEIVE: self._handle_RECEIVE,
        }

    def run(self):
        while self.alive.isSet():
            try:
                # Queue.get with timeout to allow checking self.alive
                cmd = self.cmd_q.get(True, 0.1)
                self.handlers[cmd.type](cmd)
            except Queue.Empty as e:
                continue

    def join(self, timeout=None):
        self.alive.clear()
        threading.Thread.join(self, timeout)

Note self.alive and the loop in run.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文