多重处理:我怎样才能“”从子进程重定向标准输出?

发布于 2024-12-09 05:10:47 字数 2052 浏览 0 评论 0原文

注意。我已经看到 multiprocessing.Process 的日志输出 - 不幸的是,它没有回答这个问题问题。

我正在通过多处理创建一个子进程(在 Windows 上)。我希望子进程的所有 stdout 和 stderr 输出都重定向到日志文件,而不是出现在控制台上。我看到的唯一建议是让子进程将 sys.stdout 设置为文件。但是,由于 Windows 上的 stdout 重定向行为,这并不能有效地重定向所有 stdout 输出。

为了说明问题,使用以下代码构建一个 Windows DLL

#include <iostream>

extern "C"
{
    __declspec(dllexport) void writeToStdOut()
    {
        std::cout << "Writing to STDOUT from test DLL" << std::endl;
    }
}

然后创建并运行如下所示的 python 脚本,该脚本导入此 DLL 并调用该函数:

from ctypes import *
import sys

print
print "Writing to STDOUT from python, before redirect"
print
sys.stdout = open("stdout_redirect_log.txt", "w")
print "Writing to STDOUT from python, after redirect"

testdll = CDLL("Release/stdout_test.dll")
testdll.writeToStdOut()

为了看到与我相同的行为,该 DLL 可能是必要的针对与 Python 使用的不同的 C 运行时构建。就我而言,python 是使用 Visual Studio 2010 构建的,但我的 DLL 是使用 VS 2005 构建的。

我看到的行为是控制台显示:

> stdout_test.py

Writing to STDOUT from python, before redirect

Writing to STDOUT from test DLL

虽然文件 stdout_redirect_log.txt 最终包含:

Writing to STDOUT from python, after redirect

换句话说,设置 sys.stdout 失败重定向 DLL 生成的 stdout 输出。考虑到 Windows 中标准输出重定向的底层 API 的性质,这并不奇怪。我之前在本机/C++ 级别遇到过这个问题,但从未找到一种方法来可靠地从进程内重定向标准输出。它必须在外部完成。

这实际上就是我启动子进程的原因 - 这样我就可以从外部连接到它的管道,从而保证我拦截它的所有输出。我绝对可以通过使用 pywin32 手动启动进程来做到这一点,但我非常希望能够使用多处理的设施,特别是通过多处理 Pipe 对象与子进程通信的能力,以便取得进展更新。问题是是否有任何方法可以为其 IPC 设施使用多处理,以可靠地将所有子进程的 stdout 和 stderr 输出重定向到文件。

更新:查看 multiprocessing.Processs 的源代码,它有一个静态成员 _Popen,看起来可以用来重写用于创建进程的类。如果它设置为 None (默认),它会使用 multiprocessing.forking._Popen,但看起来

multiprocessing.Process._Popen = MyPopenClass

我可以覆盖进程创建。然而,虽然我可以从 multiprocessing.forking._Popen 中得出这一点,但看起来我必须将一堆内部内容复制到我的实现中,这听起来很不稳定,而且不太面向未来。如果这是唯一的选择,我想我可能会选择使用 pywin32 手动完成整个事情。

NB. I have seen Log output of multiprocessing.Process - unfortunately, it doesn't answer this question.

I am creating a child process (on windows) via multiprocessing. I want all of the child process's stdout and stderr output to be redirected to a log file, rather than appearing at the console. The only suggestion I have seen is for the child process to set sys.stdout to a file. However, this does not effectively redirect all stdout output, due to the behaviour of stdout redirection on Windows.

To illustrate the problem, build a Windows DLL with the following code

#include <iostream>

extern "C"
{
    __declspec(dllexport) void writeToStdOut()
    {
        std::cout << "Writing to STDOUT from test DLL" << std::endl;
    }
}

Then create and run a python script like the following, which imports this DLL and calls the function:

from ctypes import *
import sys

print
print "Writing to STDOUT from python, before redirect"
print
sys.stdout = open("stdout_redirect_log.txt", "w")
print "Writing to STDOUT from python, after redirect"

testdll = CDLL("Release/stdout_test.dll")
testdll.writeToStdOut()

In order to see the same behaviour as me, it is probably necessary for the DLL to be built against a different C runtime than than the one Python uses. In my case, python is built with Visual Studio 2010, but my DLL is built with VS 2005.

The behaviour I see is that the console shows:

> stdout_test.py

Writing to STDOUT from python, before redirect

Writing to STDOUT from test DLL

While the file stdout_redirect_log.txt ends up containing:

Writing to STDOUT from python, after redirect

In other words, setting sys.stdout failed to redirect the stdout output generated by the DLL. This is unsurprising given the nature of the underlying APIs for stdout redirection in Windows. I have encountered this problem at the native/C++ level before and never found a way to reliably redirect stdout from within a process. It has to be done externally.

This is actually the very reason I am launching a child process - it's so that I can connect externally to its pipes and thus guarantee that I am intercepting all of its output. I can definitely do this by launching the process manually with pywin32, but I would very much like to be able to use the facilities of multiprocessing, in particular the ability to communicate with the child process via a multiprocessing Pipe object, in order to get progress updates. The question is whether there is any way to both use multiprocessing for its IPC facilities and to reliably redirect all of the child's stdout and stderr output to a file.

UPDATE: Looking at the source code for multiprocessing.Processs, it has a static member, _Popen, which looks like it can be used to override the class used to create the process. If it's set to None (default), it uses a multiprocessing.forking._Popen, but it looks like by saying

multiprocessing.Process._Popen = MyPopenClass

I could override the process creation. However, although I could derive this from multiprocessing.forking._Popen, it looks like I would have to copy a bunch of internal stuff into my implementation, which sounds flaky and not very future-proof. If that's the only choice I think I'd probably plump for doing the whole thing manually with pywin32 instead.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

淡莣 2024-12-16 05:10:47

您建议的解决方案是一个很好的解决方案:手动创建进程,以便您可以显式访问其 stdout/stderr 文件句柄。然后,您可以创建一个套接字来与子进程通信,并在该套接字上使用 multiprocessing.connection (multiprocessing.Pipe 创建相同类型的连接对象,因此这应该为您提供所有相同的 IPC 功能)。

这是一个两个文件的示例。

ma​​ster.py:

import multiprocessing.connection
import subprocess
import socket
import sys, os

## Listen for connection from remote process (and find free port number)
port = 10000
while True:
    try:
        l = multiprocessing.connection.Listener(('localhost', int(port)), authkey="secret")
        break
    except socket.error as ex:
        if ex.errno != 98:
            raise
        port += 1  ## if errno==98, then port is not available.

proc = subprocess.Popen((sys.executable, "subproc.py", str(port)), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

## open connection for remote process
conn = l.accept()
conn.send([1, "asd", None])
print(proc.stdout.readline())

subproc.py:

import multiprocessing.connection
import subprocess
import sys, os, time

port = int(sys.argv[1])
conn = multiprocessing.connection.Client(('localhost', port), authkey="secret")

while True:
    try:
        obj = conn.recv()
        print("received: %s\n" % str(obj))
        sys.stdout.flush()
    except EOFError:  ## connection closed
        break

您可能还想查看这个问题从子进程获取非阻塞读取。

The solution you suggest is a good one: create your processes manually such that you have explicit access to their stdout/stderr file handles. You can then create a socket to communicate with the sub-process and use multiprocessing.connection over that socket (multiprocessing.Pipe creates the same type of connection object, so this should give you all the same IPC functionality).

Here's a two-file example.

master.py:

import multiprocessing.connection
import subprocess
import socket
import sys, os

## Listen for connection from remote process (and find free port number)
port = 10000
while True:
    try:
        l = multiprocessing.connection.Listener(('localhost', int(port)), authkey="secret")
        break
    except socket.error as ex:
        if ex.errno != 98:
            raise
        port += 1  ## if errno==98, then port is not available.

proc = subprocess.Popen((sys.executable, "subproc.py", str(port)), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

## open connection for remote process
conn = l.accept()
conn.send([1, "asd", None])
print(proc.stdout.readline())

subproc.py:

import multiprocessing.connection
import subprocess
import sys, os, time

port = int(sys.argv[1])
conn = multiprocessing.connection.Client(('localhost', port), authkey="secret")

while True:
    try:
        obj = conn.recv()
        print("received: %s\n" % str(obj))
        sys.stdout.flush()
    except EOFError:  ## connection closed
        break

You may also want to see the first answer to this question to get non-blocking reads from the subprocess.

把回忆走一遍 2024-12-16 05:10:47

I don't think you have a better option than redirecting a subprocess to a file as you mentioned in your comment.

The way consoles stdin/out/err work in windows is each process when it's born has its std handles defined. You can change them with SetStdHandle. When you modify python's sys.stdout you only modify where python prints out stuff, not where other DLL's are printing stuff. Part of the CRT in your DLL is using GetStdHandle to find out where to print out to. If you want, you can do whatever piping you want in windows API in your DLL or in your python script with pywin32. Though I do think it'll be simpler with subprocess.

软甜啾 2024-12-16 05:10:47

在我的情况下,我更改了 sys.stdout.write 来写入 PySide QTextEdit。我无法读取 sys.stdout 并且我不知道如何将 sys.stdout 更改为可读。我创建了两个管道。一个用于标准输出,另一个用于标准错误。在单独的进程中,我将 sys.stdout 和 sys.stderr 重定向到多处理管道的子连接。在主进程中,我创建了两个线程来读取 stdout 和 stderr 父管道,并将管道数据重定向到 sys.stdout 和 sys.stderr 。

import sys
import contextlib
import threading
import multiprocessing as mp
import multiprocessing.queues
from queue import Empty
import time


class PipeProcess(mp.Process):
    """Process to pipe the output of the sub process and redirect it to this sys.stdout and sys.stderr.

    Note:
        The use_queue = True argument will pass data between processes using Queues instead of Pipes. Queues will
        give you the full output and read all of the data from the Queue. A pipe is more efficient, but may not
        redirect all of the output back to the main process.
    """
    def __init__(self, group=None, target=None, name=None, args=tuple(), kwargs={}, *_, daemon=None,
                 use_pipe=None, use_queue=None):
        self.read_out_th = None
        self.read_err_th = None
        self.pipe_target = target
        self.pipe_alive = mp.Event()

        if use_pipe or (use_pipe is None and not use_queue):  # Default
            self.parent_stdout, self.child_stdout = mp.Pipe(False)
            self.parent_stderr, self.child_stderr = mp.Pipe(False)
        else:
            self.parent_stdout = self.child_stdout = mp.Queue()
            self.parent_stderr = self.child_stderr = mp.Queue()

        args = (self.child_stdout, self.child_stderr, target) + tuple(args)
        target = self.run_pipe_out_target

        super(PipeProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs,
                                          daemon=daemon)

    def start(self):
        """Start the multiprocess and reading thread."""
        self.pipe_alive.set()
        super(PipeProcess, self).start()

        self.read_out_th = threading.Thread(target=self.read_pipe_out,
                                            args=(self.pipe_alive, self.parent_stdout, sys.stdout))
        self.read_err_th = threading.Thread(target=self.read_pipe_out,
                                            args=(self.pipe_alive, self.parent_stderr, sys.stderr))
        self.read_out_th.daemon = True
        self.read_err_th.daemon = True
        self.read_out_th.start()
        self.read_err_th.start()

    @classmethod
    def run_pipe_out_target(cls, pipe_stdout, pipe_stderr, pipe_target, *args, **kwargs):
        """The real multiprocessing target to redirect stdout and stderr to a pipe or queue."""
        sys.stdout.write = cls.redirect_write(pipe_stdout)  # , sys.__stdout__)  # Is redirected in main process
        sys.stderr.write = cls.redirect_write(pipe_stderr)  # , sys.__stderr__)  # Is redirected in main process

        pipe_target(*args, **kwargs)

    @staticmethod
    def redirect_write(child, out=None):
        """Create a function to write out a pipe and write out an additional out."""
        if isinstance(child, mp.queues.Queue):
            send = child.put
        else:
            send = child.send_bytes  # No need to pickle with child_conn.send(data)

        def write(data, *args):
            try:
                if isinstance(data, str):
                    data = data.encode('utf-8')

                send(data)
                if out is not None:
                    out.write(data)
            except:
                pass
        return write

    @classmethod
    def read_pipe_out(cls, pipe_alive, pipe_out, out):
        if isinstance(pipe_out, mp.queues.Queue):
            # Queue has better functionality to get all of the data
            def recv():
                return pipe_out.get(timeout=0.5)

            def is_alive():
                return pipe_alive.is_set() or pipe_out.qsize() > 0
        else:
            # Pipe is more efficient
            recv = pipe_out.recv_bytes  # No need to unpickle with data = pipe_out.recv()
            is_alive = pipe_alive.is_set

        # Loop through reading and redirecting data
        while is_alive():
            try:
                data = recv()
                if isinstance(data, bytes):
                    data = data.decode('utf-8')
                out.write(data)
            except EOFError:
                break
            except Empty:
                pass
            except:
                pass

    def join(self, *args):
        # Wait for process to finish (unless a timeout was given)
        super(PipeProcess, self).join(*args)

        # Trigger to stop the threads
        self.pipe_alive.clear()

        # Pipe must close to prevent blocking and waiting on recv forever
        if not isinstance(self.parent_stdout, mp.queues.Queue):
            with contextlib.suppress():
                self.parent_stdout.close()
            with contextlib.suppress():
                self.parent_stderr.close()

        # Close the pipes and threads
        with contextlib.suppress():
            self.read_out_th.join()
        with contextlib.suppress():
            self.read_err_th.join()

def run_long_print():
    for i in range(1000):
        print(i)
        print(i, file=sys.stderr)

    print('finished')


if __name__ == '__main__':
    # Example test write (My case was a QTextEdit)
    out = open('stdout.log', 'w')
    err = open('stderr.log', 'w')

    # Overwrite the write function and not the actual stdout object to prove this works
    sys.stdout.write = out.write
    sys.stderr.write = err.write

    # Create a process that uses pipes to read multiprocess output back into sys.stdout.write
    proc = PipeProcess(target=run_long_print, use_queue=True)  # If use_pipe=True Pipe may not write out all values
    # proc.daemon = True  # If daemon and use_queue Not all output may be redirected to stdout
    proc.start()

    # time.sleep(5)  # Not needed unless use_pipe or daemon and all of stdout/stderr is desired

    # Close the process
    proc.join()  # For some odd reason this blocks forever when use_queue=False

    # Close the output files for this test
    out.close()
    err.close()

In my situation I changed sys.stdout.write to write to a PySide QTextEdit. I couldn't read from sys.stdout and I didn't know how to change sys.stdout to be readable. I created two Pipes. One for stdout and the other for stderr. In the separate process I redirect sys.stdout and sys.stderr to the child connection of the multiprocessing pipe. On the main process I created two threads to read the stdout and stderr parent pipe and redirect the pipe data to sys.stdout and sys.stderr.

import sys
import contextlib
import threading
import multiprocessing as mp
import multiprocessing.queues
from queue import Empty
import time


class PipeProcess(mp.Process):
    """Process to pipe the output of the sub process and redirect it to this sys.stdout and sys.stderr.

    Note:
        The use_queue = True argument will pass data between processes using Queues instead of Pipes. Queues will
        give you the full output and read all of the data from the Queue. A pipe is more efficient, but may not
        redirect all of the output back to the main process.
    """
    def __init__(self, group=None, target=None, name=None, args=tuple(), kwargs={}, *_, daemon=None,
                 use_pipe=None, use_queue=None):
        self.read_out_th = None
        self.read_err_th = None
        self.pipe_target = target
        self.pipe_alive = mp.Event()

        if use_pipe or (use_pipe is None and not use_queue):  # Default
            self.parent_stdout, self.child_stdout = mp.Pipe(False)
            self.parent_stderr, self.child_stderr = mp.Pipe(False)
        else:
            self.parent_stdout = self.child_stdout = mp.Queue()
            self.parent_stderr = self.child_stderr = mp.Queue()

        args = (self.child_stdout, self.child_stderr, target) + tuple(args)
        target = self.run_pipe_out_target

        super(PipeProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs,
                                          daemon=daemon)

    def start(self):
        """Start the multiprocess and reading thread."""
        self.pipe_alive.set()
        super(PipeProcess, self).start()

        self.read_out_th = threading.Thread(target=self.read_pipe_out,
                                            args=(self.pipe_alive, self.parent_stdout, sys.stdout))
        self.read_err_th = threading.Thread(target=self.read_pipe_out,
                                            args=(self.pipe_alive, self.parent_stderr, sys.stderr))
        self.read_out_th.daemon = True
        self.read_err_th.daemon = True
        self.read_out_th.start()
        self.read_err_th.start()

    @classmethod
    def run_pipe_out_target(cls, pipe_stdout, pipe_stderr, pipe_target, *args, **kwargs):
        """The real multiprocessing target to redirect stdout and stderr to a pipe or queue."""
        sys.stdout.write = cls.redirect_write(pipe_stdout)  # , sys.__stdout__)  # Is redirected in main process
        sys.stderr.write = cls.redirect_write(pipe_stderr)  # , sys.__stderr__)  # Is redirected in main process

        pipe_target(*args, **kwargs)

    @staticmethod
    def redirect_write(child, out=None):
        """Create a function to write out a pipe and write out an additional out."""
        if isinstance(child, mp.queues.Queue):
            send = child.put
        else:
            send = child.send_bytes  # No need to pickle with child_conn.send(data)

        def write(data, *args):
            try:
                if isinstance(data, str):
                    data = data.encode('utf-8')

                send(data)
                if out is not None:
                    out.write(data)
            except:
                pass
        return write

    @classmethod
    def read_pipe_out(cls, pipe_alive, pipe_out, out):
        if isinstance(pipe_out, mp.queues.Queue):
            # Queue has better functionality to get all of the data
            def recv():
                return pipe_out.get(timeout=0.5)

            def is_alive():
                return pipe_alive.is_set() or pipe_out.qsize() > 0
        else:
            # Pipe is more efficient
            recv = pipe_out.recv_bytes  # No need to unpickle with data = pipe_out.recv()
            is_alive = pipe_alive.is_set

        # Loop through reading and redirecting data
        while is_alive():
            try:
                data = recv()
                if isinstance(data, bytes):
                    data = data.decode('utf-8')
                out.write(data)
            except EOFError:
                break
            except Empty:
                pass
            except:
                pass

    def join(self, *args):
        # Wait for process to finish (unless a timeout was given)
        super(PipeProcess, self).join(*args)

        # Trigger to stop the threads
        self.pipe_alive.clear()

        # Pipe must close to prevent blocking and waiting on recv forever
        if not isinstance(self.parent_stdout, mp.queues.Queue):
            with contextlib.suppress():
                self.parent_stdout.close()
            with contextlib.suppress():
                self.parent_stderr.close()

        # Close the pipes and threads
        with contextlib.suppress():
            self.read_out_th.join()
        with contextlib.suppress():
            self.read_err_th.join()

def run_long_print():
    for i in range(1000):
        print(i)
        print(i, file=sys.stderr)

    print('finished')


if __name__ == '__main__':
    # Example test write (My case was a QTextEdit)
    out = open('stdout.log', 'w')
    err = open('stderr.log', 'w')

    # Overwrite the write function and not the actual stdout object to prove this works
    sys.stdout.write = out.write
    sys.stderr.write = err.write

    # Create a process that uses pipes to read multiprocess output back into sys.stdout.write
    proc = PipeProcess(target=run_long_print, use_queue=True)  # If use_pipe=True Pipe may not write out all values
    # proc.daemon = True  # If daemon and use_queue Not all output may be redirected to stdout
    proc.start()

    # time.sleep(5)  # Not needed unless use_pipe or daemon and all of stdout/stderr is desired

    # Close the process
    proc.join()  # For some odd reason this blocks forever when use_queue=False

    # Close the output files for this test
    out.close()
    err.close()
追星践月 2024-12-16 05:10:47

或者 - 我知道这可能有点偏离主题,但在我的情况下解决了同样的问题 - ,这可以通过 Linux 上的屏幕来解决:

screen -L -Logfile './logfile_%Y-%m-%d.log' python my_multiproc_script.py

这样就不需要实现所有的主子通信

Alternatively - and I know this might be slightly off-topic, but helped in my case for the same problem - , this can be resolved with screen on Linux:

screen -L -Logfile './logfile_%Y-%m-%d.log' python my_multiproc_script.py

this way no need to implement all the master-child communication

哭泣的笑容 2024-12-16 05:10:47

我认为我偏离了基础并且错过了一些东西,但是当我读到你的问题时,我想到的就是它的价值。

如果您可以拦截所有 stdout 和 stderr (我从您的问题中得到了这样的印象),那么为什么不在每个进程周围添加或包装该捕获功能呢?然后将通过队列捕获的内容发送给消费者,消费者可以对所有输出执行任何您想要的操作?

I assume I'm off base and missing something, but for what it's worth here is what came to mind when I read your question.

If you can intercept all of the stdout and stderr (I got that impression from your question), then why not add or wrap that capture functionality around each of your processes? Then send what is captured through a queue to a consumer that can do whatever you want with all of the outputs?

絕版丫頭 2024-12-16 05:10:47

这是捕获 multiprocessing 的标准输出的简单直接的方法.流程

import app
import io
import sys
from multiprocessing import Process


def run_app(some_param):
    sys.stdout = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True)
    app.run()

app_process = Process(target=run_app, args=('some_param',))
app_process.start()
# Use app_process.termninate() for python <= 3.7.
app_process.kill() 

Here is the simple and straightforward way for capturing stdout for multiprocessing.Process:

import app
import io
import sys
from multiprocessing import Process


def run_app(some_param):
    sys.stdout = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True)
    app.run()

app_process = Process(target=run_app, args=('some_param',))
app_process.start()
# Use app_process.termninate() for python <= 3.7.
app_process.kill() 
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文