逐行读取子进程标准输出

发布于 2024-09-01 03:52:29 字数 710 浏览 2 评论 0原文

我的 python 脚本使用 subprocess 来调用一个非常嘈杂的 Linux 实用程序。我想将所有输出存储到日志文件中并向用户显示其中一些。我认为以下内容可行,但直到实用程序产生大量输出后,输出才会显示在我的应用程序中。

# fake_utility.py, just generates lots of output over time
import time
i = 0
    while True:
        print(hex(i)*512)
        i += 1
        time.sleep(0.5)

在父进程中:

import subprocess

proc = subprocess.Popen(['python', 'fake_utility.py'], stdout=subprocess.PIPE)
for line in proc.stdout:
    # the real code does filtering here
    print("test:", line.rstrip())

我真正想要的行为是过滤器脚本打印从子进程接收到的每一行,例如 tee 可以,但在 Python 代码中。

我缺少什么?这可能吗?


My python script uses subprocess to call a linux utility that is very noisy. I want to store all of the output to a log file and show some of it to the user. I thought the following would work, but the output doesn't show up in my application until the utility has produced a significant amount of output.

# fake_utility.py, just generates lots of output over time
import time
i = 0
    while True:
        print(hex(i)*512)
        i += 1
        time.sleep(0.5)

In the parent process:

import subprocess

proc = subprocess.Popen(['python', 'fake_utility.py'], stdout=subprocess.PIPE)
for line in proc.stdout:
    # the real code does filtering here
    print("test:", line.rstrip())

The behavior I really want is for the filter script to print each line as it is received from the subprocess, like tee does but within Python code.

What am I missing? Is this even possible?


如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(14

無心 2024-09-08 03:52:29

我认为问题出在语句 for line in proc.stdout 上,它在迭代之前读取整个输入。解决方案是使用 readline() 代替:

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
while True:
  line = proc.stdout.readline()
  if not line:
    break
  #the real code does filtering here
  print "test:", line.rstrip()

当然,您仍然需要处理子进程的缓冲。

注意:根据文档,带有迭代器的解决方案应该相当于使用 readline(),除了预读缓冲区之外,但是(或者正是因为如此)提议的更改确实为我产生了不同的结果(Windows XP 上的 Python 2.5)。

I think the problem is with the statement for line in proc.stdout, which reads the entire input before iterating over it. The solution is to use readline() instead:

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
while True:
  line = proc.stdout.readline()
  if not line:
    break
  #the real code does filtering here
  print "test:", line.rstrip()

Of course you still have to deal with the subprocess' buffering.

Note: according to the documentation the solution with an iterator should be equivalent to using readline(), except for the read-ahead buffer, but (or exactly because of this) the proposed change did produce different results for me (Python 2.5 on Windows XP).

天荒地未老 2024-09-08 03:52:29

参加聚会有点晚了,但很惊讶没有看到我认为最简单的解决方案:(

import io
import subprocess

proc = subprocess.Popen(["prog", "arg"], stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"):  # or another encoding
    # do something with line

这需要 Python 3。)

Bit late to the party, but was surprised not to see what I think is the simplest solution here:

import io
import subprocess

proc = subprocess.Popen(["prog", "arg"], stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"):  # or another encoding
    # do something with line

(This requires Python 3.)

傲娇萝莉攻 2024-09-08 03:52:29

事实上,如果您整理了迭代器,那么缓冲现在可能是您的问题。您可以告诉子进程中的 python 不要缓冲其输出。

proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)

proc = subprocess.Popen(['python','-u', 'fake_utility.py'],stdout=subprocess.PIPE)

我从 python 中调用 python 时,我需要这个。

Indeed, if you sorted out the iterator then buffering could now be your problem. You could tell the python in the sub-process not to buffer its output.

proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)

becomes

proc = subprocess.Popen(['python','-u', 'fake_utility.py'],stdout=subprocess.PIPE)

I have needed this when calling python from within python.

小糖芽 2024-09-08 03:52:29

允许同时实时、逐行迭代stdoutstderr的函数

如果您需要获取两者的输出流stdoutstderr 同时使用,可以使用以下函数。

该函数使用队列将两个 Popen 管道合并到一个迭代器中。

这里我们创建了函数read_popen_pipes()

from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
            except Empty:
                pass
            try:
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

使用中的read_popen_pipes()

import subprocess as sp


with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):

        # Do stuff with each line, e.g.:
        print(out_line, end='')
        print(err_line, end='')

    return p.poll() # return status-code

A function that allows iterating over both stdout and stderr concurrently, in realtime, line by line

In case you need to get the output stream for both stdout and stderr at the same time, you can use the following function.

The function uses Queues to merge both Popen pipes into a single iterator.

Here we create the function read_popen_pipes():

from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
            except Empty:
                pass
            try:
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

read_popen_pipes() in use:

import subprocess as sp


with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):

        # Do stuff with each line, e.g.:
        print(out_line, end='')
        print(err_line, end='')

    return p.poll() # return status-code
等风来 2024-09-08 03:52:29

您希望将这些额外参数传递给 subprocess.Popen

bufsize=1, universal_newlines=True

然后您可以像示例中那样进行迭代。 (使用Python 3.5测试)

You want to pass these extra parameters to subprocess.Popen:

bufsize=1, universal_newlines=True

Then you can iterate as in your example. (Tested with Python 3.5)

东京女 2024-09-08 03:52:29

自 2010 年以来,subprocess 模块已经取得了长足的进步,这里的大多数答案都已经过时了。

这是适用于现代 Python 版本的简单方法:

from subprocess import Popen, PIPE, STDOUT

with Popen(args, stdout=PIPE, stderr=STDOUT, text=True) as proc:
    for line in proc.stdout:
        print(line)
rc = proc.returncode

关于使用 Popen 作为上下文管理器(自 Python 3.2 起支持):在退出 with 块时,标准文件描述符将关闭,并且进程将等待/设置 returncode 属性。请参阅 subprocess.py:Popen.__exit__ 在 CPython 源代码中。

The subprocess module has come a long way since 2010, and most of the answers here are quite outdated.

Here is a simple way working for modern Python versions:

from subprocess import Popen, PIPE, STDOUT

with Popen(args, stdout=PIPE, stderr=STDOUT, text=True) as proc:
    for line in proc.stdout:
        print(line)
rc = proc.returncode

About using Popen as a context-manager (supported since Python 3.2): on exit of the with block, standard file descriptors are closed, and the process is waited / returncode attribute set. See subprocess.py:Popen.__exit__ in CPython sources.

扛刀软妹 2024-09-08 03:52:29

您还可以读取不带循环的行。适用于 python3.6。

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
list_of_byte_strings = process.stdout.readlines()

You can also read lines w/o loop. Works in python3.6.

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
list_of_byte_strings = process.stdout.readlines()
不爱素颜 2024-09-08 03:52:29

Pythont 3.5 在 subprocess 模块中添加了 run()call() 方法,两者都返回一个 CompletedProcess 对象。这样你就可以使用 proc.stdout.splitlines() 了:

proc = subprocess.run( comman, shell=True, capture_output=True, text=True, check=True )
for line in proc.stdout.splitlines():
   print "stdout:", line

另请参阅如何执行 Shell 命令在Python中使用子进程运行方法

Pythont 3.5 added the methods run() and call() to the subprocess module, both returning a CompletedProcess object. With this you are fine using proc.stdout.splitlines():

proc = subprocess.run( comman, shell=True, capture_output=True, text=True, check=True )
for line in proc.stdout.splitlines():
   print "stdout:", line

See also How to Execute Shell Commands in Python Using the Subprocess Run Method

烟柳画桥 2024-09-08 03:52:29

我用 python3 尝试过,它有效, source

当您使用 popen 生成新线程时,您告诉操作系统 PIPE 子线程的 stdout进程,以便父进程可以读取它,在这里,stderr 被复制到父进程的 stderr 中。

output_reader 中,我们通过将子进程的 stdout 的每一行包装在一个 iter 器中来读取子进程的每一行,该迭代器逐行填充子进程的输出每当新线路准备就绪时。

def output_reader(proc):
    for line in iter(proc.stdout.readline, b''):
        print('got line: {0}'.format(line.decode('utf-8')), end='')


def main():
    proc = subprocess.Popen(['python', 'fake_utility.py'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        import time
        i = 0
    
        while True:
        print (hex(i)*512)
        i += 1
        time.sleep(0.5)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

I tried this with python3 and it worked, source

When you use popen to spawn the new thread, you tell the operating system to PIPE the stdout of the child processes so the parent process can read it and here, stderr is copied to the stderr of the parent process.

in output_reader we read each line of stdout of the child process by wrapping it in an iterator that populates line by line output from the child process whenever a new line is ready.

def output_reader(proc):
    for line in iter(proc.stdout.readline, b''):
        print('got line: {0}'.format(line.decode('utf-8')), end='')


def main():
    proc = subprocess.Popen(['python', 'fake_utility.py'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        import time
        i = 0
    
        while True:
        print (hex(i)*512)
        i += 1
        time.sleep(0.5)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()
如何视而不见 2024-09-08 03:52:29

对 Rômulo 答案的以下修改适用于 Python 2 和 3(2.7.12 和 3.6.1):

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
while True:
  line = process.stdout.readline()
  if line != '':
    os.write(1, line)
  else:
    break

The following modification of Rômulo's answer works for me on Python 2 and 3 (2.7.12 and 3.6.1):

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
while True:
  line = process.stdout.readline()
  if line != '':
    os.write(1, line)
  else:
    break
西瓜 2024-09-08 03:52:29

我在更新服务器的 Popen 参数列表时遇到问题,以下代码稍微解决了这个问题。

import getpass
from subprocess import Popen, PIPE

username = 'user1'
ip = '127.0.0.1'

print ('What is the password?')
password = getpass.getpass()
cmd1 = f"""sshpass -p {password} ssh {username}@{ip}"""
cmd2 = f"""echo {password} | sudo -S apt update"""
cmd3 = " && "
cmd4 = f"""echo {password} | sudo -S apt upgrade -y"""
cmd5 = " && "
cmd6 = "exit"
commands = [cmd1, cmd2, cmd3, cmd4, cmd5, cmd6]

command = " ".join(commands)

cmd = command.split()

with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
    for line in p.stdout:
        print(line, end='')

要在本地计算机上运行更新,请使用以下代码示例执行此操作。

import getpass
from subprocess import Popen, PIPE

print ('What is the password?')
password = getpass.getpass()

cmd1_local = f"""apt update"""
cmd2_local = f"""apt upgrade -y"""
commands = [cmd1_local, cmd2_local]

with Popen(['echo', password], stdout=PIPE) as auth:
    for cmd in commands:
        cmd = cmd.split()
        with Popen(['sudo','-S'] + cmd, stdin=auth.stdout, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
            for line in p.stdout:
                print(line, end='')

I was having a problem with the arg list of Popen to update servers, the following code resolves this a bit.

import getpass
from subprocess import Popen, PIPE

username = 'user1'
ip = '127.0.0.1'

print ('What is the password?')
password = getpass.getpass()
cmd1 = f"""sshpass -p {password} ssh {username}@{ip}"""
cmd2 = f"""echo {password} | sudo -S apt update"""
cmd3 = " && "
cmd4 = f"""echo {password} | sudo -S apt upgrade -y"""
cmd5 = " && "
cmd6 = "exit"
commands = [cmd1, cmd2, cmd3, cmd4, cmd5, cmd6]

command = " ".join(commands)

cmd = command.split()

with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
    for line in p.stdout:
        print(line, end='')

And to run the update on a local computer, the following code example does this.

import getpass
from subprocess import Popen, PIPE

print ('What is the password?')
password = getpass.getpass()

cmd1_local = f"""apt update"""
cmd2_local = f"""apt upgrade -y"""
commands = [cmd1_local, cmd2_local]

with Popen(['echo', password], stdout=PIPE) as auth:
    for cmd in commands:
        cmd = cmd.split()
        with Popen(['sudo','-S'] + cmd, stdin=auth.stdout, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
            for line in p.stdout:
                print(line, end='')
烟雨扶苏 2024-09-08 03:52:29

https://stackoverflow.com/a/57093927/2580077 的改进版本并且适合 python 3.10

进行迭代的函数并行处理进程的 stdout 和 stderr。

改进:

  • 统一队列以维护 stdout 和 stderr 中条目的顺序。
  • 生成 std​​out 和 stderr 中的所有可用行 - 当调用过程较慢时这很有用。
  • 在循环中使用阻塞可以防止进程 100% 使用 CPU。
import time
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor

def enqueue_output(file, queue, level):
    for line in file:
        queue.put((level, line))
    file.close()


def read_popen_pipes(p, blocking_delay=0.5):

    with ThreadPoolExecutor(2) as pool:
        q = Queue()

        pool.submit(enqueue_output, p.stdout, q, 'stdout')
        pool.submit(enqueue_output, p.stderr, q, 'stderr')

        while True:
            if p.poll() is not None and q.empty():
                break

            lines = []
            while not q.empty():
                lines.append(q.get_nowait())

            if lines:
                yield lines

            # otherwise, loop will run as fast as possible and utilizes 100% of the CPU
            time.sleep(blocking_delay)

用法:

with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, universal_newlines=True) as p:
    for lines in read_popen_pipes(p):
        # lines - all the log entries since the last loop run.
        print('ext cmd', lines)
        # process lines

An improved version of https://stackoverflow.com/a/57093927/2580077 and suitable to python 3.10

A function to iterate over both stdout and stderr of the process in parallel.

Improvements:

  • Unified queue to maintain the order of entries in stdout and stderr.
  • Yield all available lines in stdout and stderr - this is useful when the calling process is slower.
  • Use blocking in the loop to prevent the process from utilizing 100% of the CPU.
import time
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor

def enqueue_output(file, queue, level):
    for line in file:
        queue.put((level, line))
    file.close()


def read_popen_pipes(p, blocking_delay=0.5):

    with ThreadPoolExecutor(2) as pool:
        q = Queue()

        pool.submit(enqueue_output, p.stdout, q, 'stdout')
        pool.submit(enqueue_output, p.stderr, q, 'stderr')

        while True:
            if p.poll() is not None and q.empty():
                break

            lines = []
            while not q.empty():
                lines.append(q.get_nowait())

            if lines:
                yield lines

            # otherwise, loop will run as fast as possible and utilizes 100% of the CPU
            time.sleep(blocking_delay)

Usage:

with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, universal_newlines=True) as p:
    for lines in read_popen_pipes(p):
        # lines - all the log entries since the last loop run.
        print('ext cmd', lines)
        # process lines
落日海湾 2024-09-08 03:52:29

我带着同样的问题来到这里,发现所提供的答案都不适合我。最接近的是将 sys.std.flush() 添加到子进程,这可以工作,但意味着修改该进程,这是我不想做的。

在 Popen() 中设置 bufsize=1 似乎对我的用例没有任何影响。我想问题是子进程正在缓冲,无论我如何调用 Popen()。

但是,我发现这个问题有类似的问题(How can我刷新 print 函数的输出?),答案之一是在调用 Popen 时设置环境变量 PYTHONUNBUFFERED=1。这按照我想要的方式工作,即实时逐行读取子进程的输出。

I came here with the same problem, and found that none of the provided answers really worked for me. The closest was adding the sys.std.flush() to the child process, which works but means modifying that process, which I didn't want to do.

Setting the bufsize=1 in the Popen() didn't seem to have any effect for my use case. I guess the problem is that the child process is buffering, regardless of how I call the Popen().

However, I found this question with similar problem (How can I flush the output of the print function?) and one of the answers is to set the environment variable PYTHONUNBUFFERED=1 when calling Popen. This works how I want it to, i.e. real-time line-by-line reading of the output of the child process.

童话 2024-09-08 03:52:29

在 Linux(大概还有 OSX)上,有时父进程不会立即看到输出,因为子进程正在缓冲其输出(请参阅 这篇文章 以获得更详细的解释)。

如果子进程是 Python 程序,您可以通过将环境变量 PYTHONUNBUFFERED 设置为 1 来禁用此功能,如 这个答案

如果子进程不是 Python 程序,有时您可以通过创建伪终端来欺骗它以行缓冲模式运行,如下所示:

import os
import pty
import subprocess

# Open a pseudo-terminal
master_fd, slave_fd = pty.openpty()

# Open the child process on the slave end of the PTY
with subprocess.Popen(
        ['python', 'fake_utility.py'],
        stdout=slave_fd,
        stdin=slave_fd,
        stderr=slave_fd) as proc:

    # Close our copy of the slave FD (without this we won't notice
    # when the child process closes theirs)
    os.close(slave_fd)

    # Convert the master FD into a file-like object
    with open(master_fd, 'r') as stdout:
        try:
            for line in stdout:
                # Do the actual filtering here
                print("test:", line.rstrip())
        except OSError:
            # This happens when the child process closes its STDOUT,
            # usually when it exits
            pass

如果子进程需要从 STDIN 读取数据,则无需使用 subprocess.Popen() 的 stdin=slave_fd 参数,因为子进程在决定是否使用行缓冲时应该检查 STDOUT(而不是 STDIN)的状态。

最后,有些程序实际上可能直接打开并写入其控制终端,而不是写入 STDOUT。如果您需要捕获这种情况,可以使用 setsid 实用程序,将 ['python', 'fake_utility.py'] 替换为 ['setsid', 'python', 'fake_utility.py']。 py'] 在对 subprocess.Popen() 的调用中。

On Linux (and presumably OSX), sometimes the parent process doesn't see the output immediately because the child process is buffering its output (see this article for a more detailed explanation).

If the child process is a Python program, you can disable this by setting the environment variable PYTHONUNBUFFERED to 1 as described in this answer.

If the child process is not a Python program, you can sometimes trick it into running in line-buffered mode by creating a pseudo-terminal like so:

import os
import pty
import subprocess

# Open a pseudo-terminal
master_fd, slave_fd = pty.openpty()

# Open the child process on the slave end of the PTY
with subprocess.Popen(
        ['python', 'fake_utility.py'],
        stdout=slave_fd,
        stdin=slave_fd,
        stderr=slave_fd) as proc:

    # Close our copy of the slave FD (without this we won't notice
    # when the child process closes theirs)
    os.close(slave_fd)

    # Convert the master FD into a file-like object
    with open(master_fd, 'r') as stdout:
        try:
            for line in stdout:
                # Do the actual filtering here
                print("test:", line.rstrip())
        except OSError:
            # This happens when the child process closes its STDOUT,
            # usually when it exits
            pass

If the child process needs to read from STDIN, you can get away without the stdin=slave_fd argument to subprocess.Popen(), as the child process should be checking the status of STDOUT (not STDIN) when it decides whether or not to use line-buffering.

Finally, some programs may actually directly open and write to their controlling terminal instead of writing to STDOUT. If you need to catch this case, you can use the setsid utility by replacing ['python', 'fake_utility.py'] with ['setsid', 'python', 'fake_utility.py'] in the call to subprocess.Popen().

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文