使用子进程时如何在 Python 中复制 tee 行为?

发布于 2024-09-05 03:14:15 字数 2747 浏览 5 评论 0原文

我正在寻找一个 Python 解决方案,它允许我将命令的输出保存在文件中,而不将其从控制台隐藏。

仅供参考:我正在询问 tee (作为 Unix 命令行实用程序)和不是 Python intertools 模块中的同名函数。

详细信息

  • Python 解决方案(不调用 tee,它在 Windows 下不可用)
  • 我不需要为被调用进程向 stdin 提供任何输入
  • 我无法控制被调用程序。我所知道的是它会向 stdout 和 stderr 输出一些内容并返回退出代码。
  • 在调用外部程序(子进程)时工作
  • stderrstdout
  • 适用 能够区分 stdout 和 stderr,因为我可能只想显示其中之一控制台或者我可以尝试使用不同的颜色输出 stderr - 这意味着 stderr = subprocess.STDOUT 将不起作用。
  • 实时输出(渐进式)- 该过程可以运行很长时间,而且我无法等待它完成。
  • Python 3 兼容代码(重要)

参考资料

以下是我迄今为止发现的一些不完整的解决方案:

图 http://blog.i18n.ro/wp-content/uploads/2010/06/Drawing_tee_py.png

当前代码(第二次尝试)

#!/usr/bin/python
from __future__ import print_function

import sys, os, time, subprocess, io, threading
cmd = "python -E test_output.py"

from threading import Thread
class StreamThread ( Thread ):
    def __init__(self, buffer):
        Thread.__init__(self)
        self.buffer = buffer
    def run ( self ):
        while 1:
            line = self.buffer.readline()
            print(line,end="")
            sys.stdout.flush()
            if line == '':
                break

proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdoutThread = StreamThread(io.TextIOWrapper(proc.stdout))
stderrThread = StreamThread(io.TextIOWrapper(proc.stderr))
stdoutThread.start()
stderrThread.start()
proc.communicate()
stdoutThread.join()
stderrThread.join()

print("--done--")

#### test_output.py ####

#!/usr/bin/python
from __future__ import print_function
import sys, os, time

for i in range(0, 10):
    if i%2:
        print("stderr %s" % i, file=sys.stderr)
    else:
        print("stdout %s" % i, file=sys.stdout)
    time.sleep(0.1)
Real output
stderr 1
stdout 0
stderr 3
stdout 2
stderr 5
stdout 4
stderr 7
stdout 6
stderr 9
stdout 8
--done--

预期输出是对行进行排序。请注意,修改 Popen 以仅使用一个 PIPE 是不允许的,因为在现实生活中我会想要使用 stderr 和 stdout 做不同的事情。

此外,即使在第二种情况下,我也无法获得实时的结果,事实上,当过程完成时,所有结果都已收到。默认情况下,Popen 不应使用任何缓冲区 (bufsize=0)。

I'm looking for a Python solution that will allow me to save the output of a command in a file without hiding it from the console.

FYI: I'm asking about tee (as the Unix command line utility) and not the function with the same name from Python intertools module.

Details

  • Python solution (not calling tee, it is not available under Windows)
  • I do not need to provide any input to stdin for called process
  • I have no control over the called program. All I know is that it will output something to stdout and stderr and return with an exit code.
  • To work when calling external programs (subprocess)
  • To work for both stderr and stdout
  • Being able to differentiate between stdout and stderr because I may want to display only one of the to the console or I could try to output stderr using a different color - this means that stderr = subprocess.STDOUT will not work.
  • Live output (progressive) - the process can run for a long time, and I'm not able to wait for it to finish.
  • Python 3 compatible code (important)

References

Here are some incomplete solutions I found so far:

Diagram http://blog.i18n.ro/wp-content/uploads/2010/06/Drawing_tee_py.png

Current code (second try)

#!/usr/bin/python
from __future__ import print_function

import sys, os, time, subprocess, io, threading
cmd = "python -E test_output.py"

from threading import Thread
class StreamThread ( Thread ):
    def __init__(self, buffer):
        Thread.__init__(self)
        self.buffer = buffer
    def run ( self ):
        while 1:
            line = self.buffer.readline()
            print(line,end="")
            sys.stdout.flush()
            if line == '':
                break

proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdoutThread = StreamThread(io.TextIOWrapper(proc.stdout))
stderrThread = StreamThread(io.TextIOWrapper(proc.stderr))
stdoutThread.start()
stderrThread.start()
proc.communicate()
stdoutThread.join()
stderrThread.join()

print("--done--")

#### test_output.py ####

#!/usr/bin/python
from __future__ import print_function
import sys, os, time

for i in range(0, 10):
    if i%2:
        print("stderr %s" % i, file=sys.stderr)
    else:
        print("stdout %s" % i, file=sys.stdout)
    time.sleep(0.1)

Real output

stderr 1
stdout 0
stderr 3
stdout 2
stderr 5
stdout 4
stderr 7
stdout 6
stderr 9
stdout 8
--done--

Expected output was to have the lines ordered. Remark, modifying the Popen to use only one PIPE is not allowed because in the real life I will want to do different things with stderr and stdout.

Also even in the second case I was not able to obtain real-time like out, in fact all the results were received when the process finished. By default, Popen should use no buffers (bufsize=0).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

身边 2024-09-12 03:14:15

如果需要 python 3.6 不是问题,现在可以使用 asyncio 来实现此目的。此方法允许您分别捕获 stdout 和 stderr,但仍然将两个流传输到 tty,而不使用线程。这是一个粗略的概述:

class RunOutput:
    def __init__(self, returncode, stdout, stderr):
        self.returncode = returncode
        self.stdout = stdout
        self.stderr = stderr


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def _stream_subprocess(cmd, stdin=None, quiet=False, echo=False) -> RunOutput:
    if isWindows():
        platform_settings = {"env": os.environ}
    else:
        platform_settings = {"executable": "/bin/bash"}
    if echo:
        print(cmd)
    p = await asyncio.create_subprocess_shell(
        cmd,
        stdin=stdin,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        **platform_settings
    )
    out = []
    err = []

    def tee(line, sink, pipe, label=""):
        line = line.decode("utf-8").rstrip()
        sink.append(line)
        if not quiet:
            print(label, line, file=pipe)

    await asyncio.wait(
        [
            _read_stream(p.stdout, lambda l: tee(l, out, sys.stdout)),
            _read_stream(p.stderr, lambda l: tee(l, err, sys.stderr, label="ERR:")),
        ]
    )

    return RunOutput(await p.wait(), out, err)


def run(cmd, stdin=None, quiet=False, echo=False) -> RunOutput:
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(
        _stream_subprocess(cmd, stdin=stdin, quiet=quiet, echo=echo)
    )

    return result

上面的代码基于此博客文章:https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

If requiring python 3.6 isn't an issue there is now a way of doing this using asyncio. This method allows you to capture stdout and stderr separately but still have both stream to the tty without using threads. Here's a rough outline:

class RunOutput:
    def __init__(self, returncode, stdout, stderr):
        self.returncode = returncode
        self.stdout = stdout
        self.stderr = stderr


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def _stream_subprocess(cmd, stdin=None, quiet=False, echo=False) -> RunOutput:
    if isWindows():
        platform_settings = {"env": os.environ}
    else:
        platform_settings = {"executable": "/bin/bash"}
    if echo:
        print(cmd)
    p = await asyncio.create_subprocess_shell(
        cmd,
        stdin=stdin,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        **platform_settings
    )
    out = []
    err = []

    def tee(line, sink, pipe, label=""):
        line = line.decode("utf-8").rstrip()
        sink.append(line)
        if not quiet:
            print(label, line, file=pipe)

    await asyncio.wait(
        [
            _read_stream(p.stdout, lambda l: tee(l, out, sys.stdout)),
            _read_stream(p.stderr, lambda l: tee(l, err, sys.stderr, label="ERR:")),
        ]
    )

    return RunOutput(await p.wait(), out, err)


def run(cmd, stdin=None, quiet=False, echo=False) -> RunOutput:
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(
        _stream_subprocess(cmd, stdin=stdin, quiet=quiet, echo=echo)
    )

    return result

The code above was based on this blog post: https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

微凉 2024-09-12 03:14:15

我发现这是一篇相当旧的帖子,但以防万一有人仍在寻找一种方法来做到这一点:

proc = subprocess.Popen(["ping", "localhost"], 
                        stdout=subprocess.PIPE, 
                        stderr=subprocess.PIPE)

with open("logfile.txt", "w") as log_file:
  while proc.poll() is None:
     line = proc.stderr.readline()
     if line:
        print "err: " + line.strip()
        log_file.write(line)
     line = proc.stdout.readline()
     if line:
        print "out: " + line.strip()
        log_file.write(line)

I see that this is a rather old post but just in case someone is still searching for a way to do this:

proc = subprocess.Popen(["ping", "localhost"], 
                        stdout=subprocess.PIPE, 
                        stderr=subprocess.PIPE)

with open("logfile.txt", "w") as log_file:
  while proc.poll() is None:
     line = proc.stderr.readline()
     if line:
        print "err: " + line.strip()
        log_file.write(line)
     line = proc.stdout.readline()
     if line:
        print "out: " + line.strip()
        log_file.write(line)
节枝 2024-09-12 03:14:15

这是 tee(1) 的简单移植Python。

import sys

sinks = sys.argv[1:]
sinks = [open(sink, "w") for sink in sinks]
sinks.append(sys.stderr)
while True:
    input = sys.stdin.read(1024)
    if input:
        for sink in sinks:
            sink.write(input)
    else:
        break

我现在在 Linux 上运行,但这应该适用于大多数平台。


现在对于 subprocess 部分,我不知道如何“连接”子进程的 stdinstdoutstderr< /code> 到您的 stdinstdoutstderr 和文件接收器,但我知道您可以这样做:

import subprocess

callee = subprocess.Popen(
    ["python", "-i"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

现在您可以访问 callee.stdincallee.stdoutcallee.stderr 像普通文件一样,启用上述功能“解决方案”发挥作用。如果您想获取 callee.returncode ,您需要额外调用 <代码>callee.poll()

写入callee.stdin时要小心:如果执行此操作时进程已退出,可能会出现错误(在 Linux 上,我收到IOError: [Errno 32] Broken pipeline代码>)。

This is a straightforward port of tee(1) to Python.

import sys

sinks = sys.argv[1:]
sinks = [open(sink, "w") for sink in sinks]
sinks.append(sys.stderr)
while True:
    input = sys.stdin.read(1024)
    if input:
        for sink in sinks:
            sink.write(input)
    else:
        break

I'm running on Linux right now but this ought to work on most platforms.


Now for the subprocess part, I don't know how you want to 'wire' the subprocess's stdin, stdout and stderr to your stdin, stdout, stderr and file sinks, but I know you can do this:

import subprocess

callee = subprocess.Popen(
    ["python", "-i"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

Now you can access callee.stdin, callee.stdout and callee.stderr like normal files, enabling the above "solution" to work. If you want to get the callee.returncode, you'll need to make an extra call to callee.poll().

Be careful with writing to callee.stdin: if the process has exited when you do that, an error may be rised (on Linux, I get IOError: [Errno 32] Broken pipe).

°如果伤别离去 2024-09-12 03:14:15

这是可以做到的

import sys
from subprocess import Popen, PIPE

with open('log.log', 'w') as log:
    proc = Popen(["ping", "google.com"], stdout=PIPE, encoding='utf-8')
    while proc.poll() is None:
        text = proc.stdout.readline() 
        log.write(text)
        sys.stdout.write(text)

This is how it can be done

import sys
from subprocess import Popen, PIPE

with open('log.log', 'w') as log:
    proc = Popen(["ping", "google.com"], stdout=PIPE, encoding='utf-8')
    while proc.poll() is None:
        text = proc.stdout.readline() 
        log.write(text)
        sys.stdout.write(text)
屋顶上的小猫咪 2024-09-12 03:14:15

根据社区 wiki 的回答,这里有一个更新版本。

  • 添加类型
  • 使用 gather 而不是 waitwait 给出警告)
  • 不要不必要地解码为 str
  • 添加超时。

这是一个完整的文件,您可以运行;超时设置为 5 秒,因此应该超时。

注意:Python 默认缓冲 stdout,因此您需要在任何地方使用 -u

#!/usr/bin/env -S python3 -u

import asyncio
from typing import BinaryIO, Callable, Union
import sys

class RunOutput:
    def __init__(self, exit_code: int, stdout: list[bytes], stderr: list[bytes]):
        self.exit_code = exit_code
        self.stdout = stdout
        self.stderr = stderr


async def _read_stream(stream: asyncio.StreamReader, callback: Callable[[bytes], None]):
    while True:
        line = await stream.readline()
        if len(line) == 0:
            break
        callback(line)


async def _stream_subprocess(command: list[str]) -> RunOutput:
    p = await asyncio.create_subprocess_exec(
        *command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    stdout: list[bytes] = []
    stderr: list[bytes] = []

    def tee(line: bytes, sink: list[bytes], out: BinaryIO):
        sink.append(line)
        out.write(line)

    assert p.stdout is not None
    assert p.stderr is not None

    await asyncio.gather(
        _read_stream(p.stdout, lambda l: tee(l, stdout, sys.stdout.buffer)),
        _read_stream(p.stderr, lambda l: tee(l, stderr, sys.stderr.buffer)),
    )

    exit_code = await p.wait()

    return RunOutput(exit_code, stdout, stderr)


def run(command: list[str], timeout: Union[int, float, None]) -> RunOutput:
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(
        asyncio.wait_for(_stream_subprocess(command), timeout)
    )


def main():
    if "--count" in sys.argv:
        import time

        for i in range(10):
            print(f"A stdout {i}")
            print(f"B stderr {i}", file=sys.stderr)
            time.sleep(1)
            print(f"C stderr {i}", file=sys.stderr)
            print(f"D stdout {i}")
            time.sleep(1)
    else:
        run(["python3", "-u", __file__, "--", "--count"], 5)

if __name__ == "__main__":
    main()

Based on the community wiki answer, here is an updated version.

  • Added types
  • Use gather instead of wait (wait gives a warning)
  • Don't unnecessarily decode to str
  • Add timeout.

This is a complete file that you can run; the timeout is set to 5 seconds so it should time out.

NOTE: Python buffers stdout by default so you need to use -u everywhere.

#!/usr/bin/env -S python3 -u

import asyncio
from typing import BinaryIO, Callable, Union
import sys

class RunOutput:
    def __init__(self, exit_code: int, stdout: list[bytes], stderr: list[bytes]):
        self.exit_code = exit_code
        self.stdout = stdout
        self.stderr = stderr


async def _read_stream(stream: asyncio.StreamReader, callback: Callable[[bytes], None]):
    while True:
        line = await stream.readline()
        if len(line) == 0:
            break
        callback(line)


async def _stream_subprocess(command: list[str]) -> RunOutput:
    p = await asyncio.create_subprocess_exec(
        *command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    stdout: list[bytes] = []
    stderr: list[bytes] = []

    def tee(line: bytes, sink: list[bytes], out: BinaryIO):
        sink.append(line)
        out.write(line)

    assert p.stdout is not None
    assert p.stderr is not None

    await asyncio.gather(
        _read_stream(p.stdout, lambda l: tee(l, stdout, sys.stdout.buffer)),
        _read_stream(p.stderr, lambda l: tee(l, stderr, sys.stderr.buffer)),
    )

    exit_code = await p.wait()

    return RunOutput(exit_code, stdout, stderr)


def run(command: list[str], timeout: Union[int, float, None]) -> RunOutput:
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(
        asyncio.wait_for(_stream_subprocess(command), timeout)
    )


def main():
    if "--count" in sys.argv:
        import time

        for i in range(10):
            print(f"A stdout {i}")
            print(f"B stderr {i}", file=sys.stderr)
            time.sleep(1)
            print(f"C stderr {i}", file=sys.stderr)
            print(f"D stdout {i}")
            time.sleep(1)
    else:
        run(["python3", "-u", __file__, "--", "--count"], 5)

if __name__ == "__main__":
    main()
眼中杀气 2024-09-12 03:14:15

从使用 tee 的简单示例开始(稍后我将向您展示在没有 tee 的情况下也可以执行此操作),您可以执行以下操作:

def tee(command, **kwargs):
    p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs)
    t_out = subprocess.Popen(['tee', '-a', '/dev/stderr'], stdin=p.stdout, stderr=subprocess.PIPE, text=True)
    t_err = subprocess.Popen(['tee', '-a', '/dev/stderr'], stdin=p.stderr, stdout=subprocess.PIPE, text=True)
    return p, t_out, t_err

此处:

  1. 在子进程中启动我们的命令,< code>p,捕获 stderrstdout
  2. 我们启动另一个子进程t_out,运行tee,并且我们仅捕获stderr(允许stdout从tee流出) > 通常)
  3. 我们对子进程 t_err 执行相同的操作,但从 p 发送 stderr 并仅捕获 stdout (允许 stderr 正常流向 stderr)

最终结果是命令的 stdout 和 stderr 正常输出到终端,并在返回的子进程中捕获。

假设有一个写入 stderr 和 stdout 的简单程序:

# test.py
import sys, time
for i in range(10):
    if i % 2 == 0:
        print(i, file=sys.stderr, flush=True)
    else:
        print(i, flush=True)
    time.sleep(0.1)

您可以这样做:

print('starting')
process, t_out, t_err = tee([sys.executable, 'test.py'])
while process.poll() is None:
    time.sleep(0.1)  # wait for process to finish
print('done')
print('stdout:', t_out.stderr.read())
print('stderr:', t_err.stdout.read())

除了程序的输出之外,您还可以看到 Python 脚本可以读取 stdout 和 stderr:

starting
0
1
2
3
4
5
6
7
8
9
done
stdout: 1
3
5
7
9

stderr: 0
2
4
6
8

不使用 tee

请注意,使用程序tee实际上并不是必需的。这可以很容易地成为一个纯 python 程序,它读取 stdin 并像 tee 一样读取输出。

例如,可以改用以下 Python 脚本。它所做的只是读取 stdin 并将其打印到 stdout 和 stderr。

# tee.py
import sys
for line in sys.stdin:
    print(line, file=sys.stdout, flush=True, end='')
    print(line, file=sys.stderr, flush=True, end='')

然后第一个示例可以这样修改:

def tee(command, **kwargs):
    p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs)
    t_out = subprocess.Popen([sys.executable, 'tee.py'], stdin=p.stdout, stderr=subprocess.PIPE, text=True)
    t_err = subprocess.Popen([sys.executable, 'tee.py'], stdin=p.stderr, stdout=subprocess.PIPE, text=True)
    return p, t_out, t_err

最终结果与第一个示例类似,但不需要程序 tee


该解决方案也不一定需要使用额外的子流程。这只是一种方法。可以在使用第一个子进程的 stderr/stdout 的两个线程中完成相同的解决方案。

注意:如果 stdout 和 stderr 大约同时写入,则需要进行一些重大更改才能保证消息到达终端的正确顺序。

Starting with a simple example using tee (I'll show you can do this without tee later) you can do the following:

def tee(command, **kwargs):
    p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs)
    t_out = subprocess.Popen(['tee', '-a', '/dev/stderr'], stdin=p.stdout, stderr=subprocess.PIPE, text=True)
    t_err = subprocess.Popen(['tee', '-a', '/dev/stderr'], stdin=p.stderr, stdout=subprocess.PIPE, text=True)
    return p, t_out, t_err

Here:

  1. Start our command in a subprocess, p, capturing both stderr and stdout.
  2. We start another subprocess, t_out, running tee, and we capture only stderr (allowing stdout to flow out from tee normally)
  3. We do the same for the subprocess t_err, but sending stderr from p and capturing only stdout (allowing stderr to flow normally to stderr)

The end result is that stdout and stderr of your command are output to the terminal normally and also captured in the returned subprocesses.

Suppose a simple program that writes to stderr and stdout:

# test.py
import sys, time
for i in range(10):
    if i % 2 == 0:
        print(i, file=sys.stderr, flush=True)
    else:
        print(i, flush=True)
    time.sleep(0.1)

You could do:

print('starting')
process, t_out, t_err = tee([sys.executable, 'test.py'])
while process.poll() is None:
    time.sleep(0.1)  # wait for process to finish
print('done')
print('stdout:', t_out.stderr.read())
print('stderr:', t_err.stdout.read())

In addition to the program's output, you can see that stdout and stderr are readable by the Python script:

starting
0
1
2
3
4
5
6
7
8
9
done
stdout: 1
3
5
7
9

stderr: 0
2
4
6
8

Without using tee

Note that use of the program tee is not actually necessary. This could just as easily be a pure python program that reads stdin and tees the output the same as tee does.

For example, the following Python script can be used instead. All it does is read stdin and print it to stdout and stderr.

# tee.py
import sys
for line in sys.stdin:
    print(line, file=sys.stdout, flush=True, end='')
    print(line, file=sys.stderr, flush=True, end='')

Then the first example can be modified like this:

def tee(command, **kwargs):
    p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs)
    t_out = subprocess.Popen([sys.executable, 'tee.py'], stdin=p.stdout, stderr=subprocess.PIPE, text=True)
    t_err = subprocess.Popen([sys.executable, 'tee.py'], stdin=p.stderr, stdout=subprocess.PIPE, text=True)
    return p, t_out, t_err

The end result is similar to the first example, but doesn't require the program tee.


This solution also doesn't necessarily require the use of the additional subprocesses. It's just one way to do it. The same solution could be done in two threads that consume stderr/stdout of the first subprocess.

CAVEAT: some substantial changes would be needed to guarantee correct order of arrival of messages to the terminal if stdout and stderr are written to at about the same time.

╰つ倒转 2024-09-12 03:14:15

如果您不想与进程交互,可以使用 subprocess 模块。

示例:

tester.py

import os
import sys

for file in os.listdir('.'):
    print file

sys.stderr.write("Oh noes, a shrubbery!")
sys.stderr.flush()
sys.stderr.close()

test.py

import subprocess

p = subprocess.Popen(['python', 'tester.py'], stdout=subprocess.PIPE,
                     stdin=subprocess.PIPE, stderr=subprocess.PIPE)

stdout, stderr = p.communicate()
print stdout, stderr

在您的情况下,您可以先将 stdout/stderr 写入文件。您也可以通过通信向进程发送参数,尽管我无法弄清楚如何持续与子进程交互。

If you don't want to interact with the process you can use the subprocess module just fine.

Example:

tester.py

import os
import sys

for file in os.listdir('.'):
    print file

sys.stderr.write("Oh noes, a shrubbery!")
sys.stderr.flush()
sys.stderr.close()

testing.py

import subprocess

p = subprocess.Popen(['python', 'tester.py'], stdout=subprocess.PIPE,
                     stdin=subprocess.PIPE, stderr=subprocess.PIPE)

stdout, stderr = p.communicate()
print stdout, stderr

In your situation you can simply write stdout/stderr to a file first. You can send arguments to your process with communicate as well, though I wasn't able to figure out how to continually interact with the subprocess.

染年凉城似染瑾 2024-09-12 03:14:15

在 Linux 上,如果您确实需要类似 tee(2 ) 系统调用,你可以这样得到它:

import os
import ctypes

ld = ctypes.CDLL(None, use_errno=True)

SPLICE_F_NONBLOCK = 0x02


def tee(fd_in, fd_out, length, flags=SPLICE_F_NONBLOCK):
    result = ld.tee(
        ctypes.c_int(fd_in),
        ctypes.c_int(fd_out),
        ctypes.c_size_t(length),
        ctypes.c_uint(flags),
    )

    if result == -1:
        errno = ctypes.get_errno()
        raise OSError(errno, os.strerror(errno))

    return result

要使用它,你可能想要使用 Python 3.10 和带有 os.splice 的东西(或者使用 ctypes< /code> 以同样的方式获得splice)。请参阅 tee(2) 手册页< /a> 为例。

On Linux, if you really need something like the tee(2) syscall, you can get it like this:

import os
import ctypes

ld = ctypes.CDLL(None, use_errno=True)

SPLICE_F_NONBLOCK = 0x02


def tee(fd_in, fd_out, length, flags=SPLICE_F_NONBLOCK):
    result = ld.tee(
        ctypes.c_int(fd_in),
        ctypes.c_int(fd_out),
        ctypes.c_size_t(length),
        ctypes.c_uint(flags),
    )

    if result == -1:
        errno = ctypes.get_errno()
        raise OSError(errno, os.strerror(errno))

    return result

To use this, you probably want to use Python 3.10 and something with os.splice (or use ctypes in the same way to get splice). See the tee(2) man page for an example.

关于从前 2024-09-12 03:14:15

我的解决方案并不优雅,但它有效。

您可以使用 powershell 在 WinOS 下访问“tee”。

import subprocess
import sys

cmd = ['powershell', 'ping', 'google.com', '|', 'tee', '-a', 'log.txt']

if 'darwin' in sys.platform:
    cmd.remove('powershell')

p = subprocess.Popen(cmd)
p.wait()

My solution isn't elegant, but it works.

You can use powershell to gain access to "tee" under WinOS.

import subprocess
import sys

cmd = ['powershell', 'ping', 'google.com', '|', 'tee', '-a', 'log.txt']

if 'darwin' in sys.platform:
    cmd.remove('powershell')

p = subprocess.Popen(cmd)
p.wait()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文