在Python中,我如何知道进程何时完成?

发布于 2024-10-17 03:34:28 字数 468 浏览 5 评论 0原文

从 Python GUI (PyGTK) 中,我启动一个进程(使用多处理)。该过程需要很长时间(约 20 分钟)才能完成。当该过程完成后,我想清理它(提取结果并加入该过程)。我如何知道该过程何时完成?

我的同事建议在父进程中使用一个繁忙循环来检查子进程是否已完成。当然有更好的方法。

在 Unix 中,当进程分叉时, 当子进程完成时,从父进程内部调用信号处理程序。但我在 Python 中看不到类似的东西。我错过了什么吗?

如何从父进程中观察到子进程的结束? (当然,我不想调用 Process.join(),因为它会冻结 GUI 界面。)

这个问题不仅限于多处理:我在多线程方面也有完全相同的问题。

From within a Python GUI (PyGTK) I start a process (using multiprocessing). The process takes a long time (~20 minutes) to finish. When the process is finished I would like to clean it up (extract the results and join the process). How do I know when the process has finished?

My colleague suggested a busy loop within the parent process that checks if the child process has finished. Surely there is a better way.

In Unix, when a process is forked, a signal handler is called from within the parent process when the child process has finished. But I cannot see anything like that in Python. Am I missing something?

How is it that the end of a child process can be observed from within the parent process? (Of course, I do not want to call Process.join() as it would freeze up the GUI interface.)

This question is not limited to multi-processing: I have exactly the same problem with multi-threading.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

等风来 2024-10-24 03:34:28

我认为作为使python多平台的一部分,像SIGCHLD这样的简单事情必须自己完成。同意,当你只想知道孩子什么时候完成时,这需要做更多的工作,但实际上并没有那么痛苦。考虑以下使用子进程来完成工作的情况,两个 multiprocessing.Event 实例和一个线程来检查子进程是否完成:

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def checkChild(event, killEvent):
    event.wait()
    print "Child checked, and is done playing"
    if raw_input("Do again? y/n:") == "y":
        event.clear()
        t = threading.Thread(target=checkChild, args=(event, killEvent))
        t.start()
        p = Process(target=childsPlay, args=(event,))
        p.start()
    else:
        cleanChild()
        killEvent.set()

def cleanChild():
    print "Cleaning up the child..."

if __name__ == '__main__':
    event = Event()
    killEvent = Event()

    # process to do work
    p = Process(target=childsPlay, args=(event,))
    p.start()

    # thread to check on child process
    t = threading.Thread(target=checkChild, args=(event, killEvent))
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        print "Main done"

编辑

加入到创建的所有进程和线程是一个很好的做法,因为它将有助于指示何时出现僵尸进程(永远不会完成)进程/线程正在创建。我修改了上面的代码,创建了一个继承自 threading.Thread 的 ChildChecker 类。它的唯一目的是在单独的进程中启动作业,等待该进程完成,然后在一切完成后通知 GUI。加入 ChildChecker 也会加入它正在“检查”的进程。现在,如果进程在 5 秒后没有加入,线程将强制终止进程。输入“y”创建启动一个运行“endlessChildsPlay”的子进程,该进程必须演示强制终止。

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def endlessChildsPlay(event):
    print "Endless child started"
    while True:
        print "Endless child is playing..."
        sleep(1)
        event.set()
    print "Endless child done"

class ChildChecker(threading.Thread):
    def __init__(self, killEvent):
        super(ChildChecker, self).__init__()
        self.killEvent = killEvent
        self.event = Event()
        self.process = Process(target=childsPlay, args=(self.event,))

    def run(self):
        self.process.start()

        while not self.killEvent.is_set():
            self.event.wait()
            print "Child checked, and is done playing"
            if raw_input("Do again? y/n:") == "y":
                self.event.clear()
                self.process = Process(target=endlessChildsPlay, args=(self.event,))
                self.process.start()
            else:
                self.cleanChild()
                self.killEvent.set()

    def join(self):
        print "Joining child process"
        # Timeout on 5 seconds
        self.process.join(5)

        if self.process.is_alive():
            print "Child did not join!  Killing.."
            self.process.terminate()
        print "Joining ChildChecker thread"
        super(ChildChecker, self).join()


    def cleanChild(self):
        print "Cleaning up the child..."

if __name__ == '__main__':
    killEvent = Event()
    # thread to check on child process
    t = ChildChecker(killEvent)
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        t.join()
        print "Main done"

I think as a part of making python multi-platform, simple things like SIGCHLD must be done yourself. Agreed, this is a little more work when all you want to do is know when the child is done, but it really isn't THAT painful. Consider the following that uses a child process to do the work, two multiprocessing.Event instances, and a thread to check if the child process is done:

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def checkChild(event, killEvent):
    event.wait()
    print "Child checked, and is done playing"
    if raw_input("Do again? y/n:") == "y":
        event.clear()
        t = threading.Thread(target=checkChild, args=(event, killEvent))
        t.start()
        p = Process(target=childsPlay, args=(event,))
        p.start()
    else:
        cleanChild()
        killEvent.set()

def cleanChild():
    print "Cleaning up the child..."

if __name__ == '__main__':
    event = Event()
    killEvent = Event()

    # process to do work
    p = Process(target=childsPlay, args=(event,))
    p.start()

    # thread to check on child process
    t = threading.Thread(target=checkChild, args=(event, killEvent))
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        print "Main done"

EDIT

Joining to all processes and threads created is a good practice because it will help indicate when zombie (never-finishing) processes/threads are being created. I've altered the above code making a ChildChecker class that inherits from threading.Thread. It's sole purpose is to start a job in a separate process, wait for that process to finish, and then notify the GUI when everything is complete. Joining on the ChildChecker will also join the process it is "checking". Now, if the process doesn't join after 5 seconds, the thread will force terminate the process. Enter "y" creates starts a child process running "endlessChildsPlay" that must demonstrate force termination.

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def endlessChildsPlay(event):
    print "Endless child started"
    while True:
        print "Endless child is playing..."
        sleep(1)
        event.set()
    print "Endless child done"

class ChildChecker(threading.Thread):
    def __init__(self, killEvent):
        super(ChildChecker, self).__init__()
        self.killEvent = killEvent
        self.event = Event()
        self.process = Process(target=childsPlay, args=(self.event,))

    def run(self):
        self.process.start()

        while not self.killEvent.is_set():
            self.event.wait()
            print "Child checked, and is done playing"
            if raw_input("Do again? y/n:") == "y":
                self.event.clear()
                self.process = Process(target=endlessChildsPlay, args=(self.event,))
                self.process.start()
            else:
                self.cleanChild()
                self.killEvent.set()

    def join(self):
        print "Joining child process"
        # Timeout on 5 seconds
        self.process.join(5)

        if self.process.is_alive():
            print "Child did not join!  Killing.."
            self.process.terminate()
        print "Joining ChildChecker thread"
        super(ChildChecker, self).join()


    def cleanChild(self):
        print "Cleaning up the child..."

if __name__ == '__main__':
    killEvent = Event()
    # thread to check on child process
    t = ChildChecker(killEvent)
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        t.join()
        print "Main done"
余厌 2024-10-24 03:34:28

这个答案真的很简单! (我只花了几天来解决这个问题。)

结合PyGTK的idle_add(),你可以创建一个AutoJoiningThread。总代码非常简单:

class AutoJoiningThread(threading.Thread):
    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)

如果您想做的不仅仅是连接(例如收集结果),那么您可以扩展上面的类以在完成时发出信号,如以下示例所示:

import threading
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = None

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
        print "Child finished playing."
        self.result = 42

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    thread = AutoJoiningThread(target=child.play,
                               args=(3,))
    thread.connect('finished', child.get_result)
    print "Starting thread"
    thread.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

上面示例的输出将取决于线程执行的顺序,但它类似于:

Creating child
Creating thread
Starting thread
Child starting to play.
 Child playing.
Running mainloop (Ctrl+C to exit)
Child playing.
Child playing.
Child finished playing.
Called Thread.join()
The result was 42
^CReceived KeyboardInterrupt.  Quiting.

不可能以相同的方式创建 AutoJoiningProcess(因为我们无法跨两个不同的进程调用idle_add()),但是我们可以使用 AutoJoiningThread 来获取什么我们想要:

class AutoJoiningProcess(multiprocessing.Process):
    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start() # automatically joins

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()

为了演示 AutoJoiningProcess,这里有另一个例子:

import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = multiprocessing.Manager().list()

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
    print "Child finished playing."
        self.result.append(42)

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
    }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start()

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        multiprocessing.Process.join(self)
        print "Called Process.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    process = AutoJoiningProcess(target=child.play,
                               args=(3,))
    process.connect('finished',child.get_result)
    print "Starting thread"
    process.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

结果输出将与上面的例子非常相似,除了这次我们同时有进程加入和它的伴随线程加入:

Creating child
Creating thread
Starting thread
Running mainloop (Ctrl+C to exit)
 Child starting to play.
Child playing.
Child playing.
Child playing.
Child finished playing.
Called Process.join()
The result was [42]
Called Thread.join()
^CReceived KeyboardInterrupt.  Quiting.

不幸的是:

  1. 这个解决方案依赖于 gobject,因为使用idle_add()。 gobject 由 PyGTK 使用。
  2. 这不是真正的父子关系。如果其中一个线程由另一个线程启动,那么它仍然会由运行主循环的线程而不是父线程加入。这个问题也适用于 AutoJoiningProcess,除了我想象会抛出异常。

因此,要使用这种方法,最好只从主循环/GUI 内创建线程/进程。

This answer is really simple! (It just took me days to work it out.)

Combined with PyGTK's idle_add(), you can create an AutoJoiningThread. The total code is borderline trivial:

class AutoJoiningThread(threading.Thread):
    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)

If you want to do more than just join (such as collecting results) then you can extend the above class to emit signals on completion, as is done in the following example:

import threading
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = None

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
        print "Child finished playing."
        self.result = 42

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    thread = AutoJoiningThread(target=child.play,
                               args=(3,))
    thread.connect('finished', child.get_result)
    print "Starting thread"
    thread.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

The output of the above example will depend on the order the threads are executed, but it will be similar to:

Creating child
Creating thread
Starting thread
Child starting to play.
 Child playing.
Running mainloop (Ctrl+C to exit)
Child playing.
Child playing.
Child finished playing.
Called Thread.join()
The result was 42
^CReceived KeyboardInterrupt.  Quiting.

It's not possible to create an AutoJoiningProcess in the same way (because we cannot call idle_add() across two different processes), however we can use an AutoJoiningThread to get what we want:

class AutoJoiningProcess(multiprocessing.Process):
    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start() # automatically joins

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()

To demonstrate AutoJoiningProcess here is another example:

import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = multiprocessing.Manager().list()

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
    print "Child finished playing."
        self.result.append(42)

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
    }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start()

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        multiprocessing.Process.join(self)
        print "Called Process.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    process = AutoJoiningProcess(target=child.play,
                               args=(3,))
    process.connect('finished',child.get_result)
    print "Starting thread"
    process.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

The resulting output will be very similar to the example above, except this time we have both the process joining and it's attendant thread joining too:

Creating child
Creating thread
Starting thread
Running mainloop (Ctrl+C to exit)
 Child starting to play.
Child playing.
Child playing.
Child playing.
Child finished playing.
Called Process.join()
The result was [42]
Called Thread.join()
^CReceived KeyboardInterrupt.  Quiting.

Unfortunately:

  1. This solution is dependent on gobject, due to the use of idle_add(). gobject is used by PyGTK.
  2. This is not a true parent/child relationship. If one of these threads is started by another thread, then it will nonetheless be joined by the thread running the mainloop, not the parent thread. This problem holds true for AutoJoiningProcess too, except there I imagine an exception would be thrown.

Thus to use this approach, it would be best to only create threads/process from within the mainloop/GUI.

凉宸 2024-10-24 03:34:28

您可以使用 队列 与子进程进行通信。您可以在上面粘贴中间结果,或者指示已达到里程碑的消息(对于进度条),或者只是指示流程已准备好加入的消息。使用 empty 进行轮询既简单又快速。

如果你真的只想知道它是否完成,你可以观看 exitcode< /a> 您的进程或轮询 is_alive()

You can use a queue to communicate with child processes. You can stick intermediate results on it, or messages indicating that milestones have been hit (for progress bars) or just a message indicating that the process is ready to be joined. Polling it with empty is easy and fast.

If you really only want to know if it's done, you can watch the exitcode of your process or poll is_alive().

吃→可爱长大的 2024-10-24 03:34:28

在我努力寻找自己问题的答案时,我偶然发现了 PyGTK 的 idle_add()函数。这给了我以下可能性:

  1. 创建一个通过队列进行通信的新子进程。
  2. 创建一个侦听队列的侦听器线程,当子进程向侦听器发送一条消息说它已完成时,侦听器调用idle_add()来设置回调。
  3. 在下一次主循环期间,父进程将调用回调。
  4. 回调可以提取结果、加入子进程和加入侦听器线程。

这似乎是重新创建 Unix 子进程完成时的调用回调的过于复杂的方法。

这肯定是 Python 中 GUI 的一个非常常见的问题。当然有一个标准模式可以解决这个问题吗?

In my efforts to try to find an answer to my own question, I stumbled across PyGTK's idle_add() function. This gives me the following possibility:

  1. Create a new child process that communicates via a Queue.
  2. Create a listener thread that listens to the Queue, when the child process sends the listener a message saying that it is finished, the listener calls idle_add() that sets up a callback.
  3. During the next time around the main loop the parent process will call the callback.
  4. The callback can extract results, join the child process and join the listener-thread.

This seems an overly complex way to re-create Unix's call-callback-when-child-process-is-done.

This must be an uber-common problem with GUIs in Python. Surely there is a standard pattern to solve this problem?

指尖凝香 2024-10-24 03:34:28

看看 subprocess 模块:

http://docs.python.org/library/subprocess.html

import subprocess
let pipe = subprocess.Popen("ls -l", stdout=subprocess.PIPE)
allText = pipe.stdout.read()
pipe.wait()
retVal = pipe.returncode

have a look at the subprocess module:

http://docs.python.org/library/subprocess.html

import subprocess
let pipe = subprocess.Popen("ls -l", stdout=subprocess.PIPE)
allText = pipe.stdout.read()
pipe.wait()
retVal = pipe.returncode
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文