包装子流程'标准输出/标准错误

发布于 2024-10-05 14:32:30 字数 1482 浏览 9 评论 0原文

我想捕获并显示通过 Python 子进程调用的进程的输出。

我以为我可以将我的类文件对象作为命名参数 stdout 和 stderr 传递,

我可以看到它访问 fileno 属性 - 所以它正在对对象执行某些操作。 但是,write() 方法永远不会被调用。我的方法是完全错误还是我只是错过了一些东西?

class Process(object):
    class StreamWrapper(object):
        def __init__(self, stream):
            self._stream = stream
            self._buffer = []
        def _print(self, msg):
            print repr(self), msg
        def __getattr__(self, name):
            if not name in ['fileno']:
                self._print("# Redirecting: %s" % name)
            return getattr(self._stream, name)
        def write(self, data):
            print "###########"
            self._buffer.append(data)
            self._stream.write(data)
            self._stream.flush()
        def getBuffer(self):
            return self._buffer[:]
    def __init__(self, *args, **kwargs):
        print ">> Running `%s`" % " ".join(args[0])
        self._stdout = self.StreamWrapper(sys.stdout)
        self._stderr = self.StreamWrapper(sys.stderr)
        kwargs.setdefault('stdout', self._stdout)
        kwargs.setdefault('stderr', self._stderr)
        self._process = subprocess.Popen(*args, **kwargs)
        self._process.communicate()

更新:

我还想使用 ANSI 控制字符来移动光标并覆盖以前的输出内容。我不知道这是否是正确的术语,但这里有一个例子来说明我的意思:我正在尝试自动化一些 GIT 的东西,并且它们的进度可以自我更新,而无需每次都写入新行。

更新 2

对我来说,立即显示子流程的输出很重要。我尝试使用 subprocess.PIPE 捕获输出并手动显示它,但我只能在进程完成后让它显示输出。但是,我想实时查看输出。

I'd like to both capture and display the output of a process that I invoke through Python's subprocess.

I thought I could just pass my file-like object as named parameter stdout and stderr

I can see that it accesses the filenoattribute - so it is doing something with the object.
However, the write() method is never invoked. Is my approach completely off or am I just missing something?

class Process(object):
    class StreamWrapper(object):
        def __init__(self, stream):
            self._stream = stream
            self._buffer = []
        def _print(self, msg):
            print repr(self), msg
        def __getattr__(self, name):
            if not name in ['fileno']:
                self._print("# Redirecting: %s" % name)
            return getattr(self._stream, name)
        def write(self, data):
            print "###########"
            self._buffer.append(data)
            self._stream.write(data)
            self._stream.flush()
        def getBuffer(self):
            return self._buffer[:]
    def __init__(self, *args, **kwargs):
        print ">> Running `%s`" % " ".join(args[0])
        self._stdout = self.StreamWrapper(sys.stdout)
        self._stderr = self.StreamWrapper(sys.stderr)
        kwargs.setdefault('stdout', self._stdout)
        kwargs.setdefault('stderr', self._stderr)
        self._process = subprocess.Popen(*args, **kwargs)
        self._process.communicate()

Update:

Something I'd like to work as well, is the ANSI control characters to move the cursor and override previously output stuff. I don't know whether that is the correct term, but here's an example of what I meant: I'm trying to automate some GIT stuff and there they have the progress that updates itself without writing to a new line each time.

Update 2

It is important to me, that the output of the subprocess is displayed immediately. I've tried using subprocess.PIPE to capture the output, and display it manually, but I was only able to get it to display the output, once the process had completed. However, I'd like to see the output in real-time.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

为你拒绝所有暧昧 2024-10-12 14:32:31

类似文件还不够接近。它必须是具有实际文件描述符的实际文件。使用 subprocess 对管道的支持并根据需要从中读取。

A file-like is not close enough. It must be an actual file with an actual file descriptor. Use subprocess's support for pipes and read from them as appropriate.

硪扪都還晓 2024-10-12 14:32:31

这是来自 Chromium CI 产品 Catapult 的函数(BSD-3-Clause执照)。

它是非阻塞的,并且会耗尽输出。也就是说,处理大多数答案中存在的问题。

我还会研究 os.set_blocking() 而不是这里的 fcntl

os.set_blocking() 是在 Python 3.5 中引入的,从 Python 3.12 开始支持 Windows 管道。因此,从 3.12 开始,代码也可以移植到 Windows。

def _IterProcessStdoutFcntl(process,
                            iter_timeout=None,
                            timeout=None,
                            buffer_size=4096,
                            poll_interval=1):
  """An fcntl-based implementation of _IterProcessStdout."""
  # pylint: disable=too-many-nested-blocks
  import fcntl
  try:
    # Enable non-blocking reads from the child's stdout.
    child_fd = process.stdout.fileno()
    fl = fcntl.fcntl(child_fd, fcntl.F_GETFL)
    fcntl.fcntl(child_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

    end_time = (time.time() + timeout) if timeout else None
    iter_end_time = (time.time() + iter_timeout) if iter_timeout else None

    while True:
      if end_time and time.time() > end_time:
        raise TimeoutError()
      if iter_end_time and time.time() > iter_end_time:
        yield None
        iter_end_time = time.time() + iter_timeout

      if iter_end_time:
        iter_aware_poll_interval = min(poll_interval,
                                       max(0, iter_end_time - time.time()))
      else:
        iter_aware_poll_interval = poll_interval

      read_fds, _, _ = select.select([child_fd], [], [],
                                     iter_aware_poll_interval)
      if child_fd in read_fds:
        data = _read_and_decode(child_fd, buffer_size)
        if not data:
          break
        yield data

      if process.poll() is not None:
        # If process is closed, keep checking for output data (because of timing
        # issues).
        while True:
          read_fds, _, _ = select.select([child_fd], [], [],
                                         iter_aware_poll_interval)
          if child_fd in read_fds:
            data = _read_and_decode(child_fd, buffer_size)
            if data:
              yield data
              continue
          break
        break
  finally:
    try:
      if process.returncode is None:
        # Make sure the process doesn't stick around if we fail with an
        # exception.
        process.kill()
    except OSError:
      pass
    process.wait()

Here is a function from Chromium CI product, Catapult (BSD-3-Clause license).

It's non-blocking, and drains the output. That is, handles the problems that are present in most answers on SO.

I would also look into os.set_blocking() instead of fcntl here.

os.set_blocking() is introduced in Python 3.5, supports Windows pipes since Python 3.12. So since 3.12 it would make the code portable to Windows too.

def _IterProcessStdoutFcntl(process,
                            iter_timeout=None,
                            timeout=None,
                            buffer_size=4096,
                            poll_interval=1):
  """An fcntl-based implementation of _IterProcessStdout."""
  # pylint: disable=too-many-nested-blocks
  import fcntl
  try:
    # Enable non-blocking reads from the child's stdout.
    child_fd = process.stdout.fileno()
    fl = fcntl.fcntl(child_fd, fcntl.F_GETFL)
    fcntl.fcntl(child_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

    end_time = (time.time() + timeout) if timeout else None
    iter_end_time = (time.time() + iter_timeout) if iter_timeout else None

    while True:
      if end_time and time.time() > end_time:
        raise TimeoutError()
      if iter_end_time and time.time() > iter_end_time:
        yield None
        iter_end_time = time.time() + iter_timeout

      if iter_end_time:
        iter_aware_poll_interval = min(poll_interval,
                                       max(0, iter_end_time - time.time()))
      else:
        iter_aware_poll_interval = poll_interval

      read_fds, _, _ = select.select([child_fd], [], [],
                                     iter_aware_poll_interval)
      if child_fd in read_fds:
        data = _read_and_decode(child_fd, buffer_size)
        if not data:
          break
        yield data

      if process.poll() is not None:
        # If process is closed, keep checking for output data (because of timing
        # issues).
        while True:
          read_fds, _, _ = select.select([child_fd], [], [],
                                         iter_aware_poll_interval)
          if child_fd in read_fds:
            data = _read_and_decode(child_fd, buffer_size)
            if data:
              yield data
              continue
          break
        break
  finally:
    try:
      if process.returncode is None:
        # Make sure the process doesn't stick around if we fail with an
        # exception.
        process.kill()
    except OSError:
      pass
    process.wait()
内心激荡 2024-10-12 14:32:31

你在课堂上上课有什么原因吗? stdout 和 stderr 可以采用任何文件,例如 line 。因此,只需传递一个打开的文件类型或 stringIO 就足以修改流

import sys
sys.stdout = open('test.txt','w')
print "Testing!"
sys.stdout.write('\nhehehe')
sys.stdout = sys.__stdout__
sys.exit(0)

Is there a reason you have a class inside a class? And stdout and stderr can take any file like line for example try. So simply passing an open file type or stringIO should be enough to modify the stream

import sys
sys.stdout = open('test.txt','w')
print "Testing!"
sys.stdout.write('\nhehehe')
sys.stdout = sys.__stdout__
sys.exit(0)
旧情勿念 2024-10-12 14:32:30

进程的 stdin、stdout 和 stderr 需要是真实的文件描述符。 (这实际上不是 Python 施加的限制,而是管道在操作系统级别上的工作方式。)因此您将需要不同的解决方案。

如果您想实时跟踪 stdoutstderr,您将需要异步 I/O 或线程。

  • 异步 I/O:使用标准同步(=阻塞)I/O,对其中一个流的读取可能会阻塞,从而不允许实时访问另一个流。如果您使用的是 Unix,则可以使用非阻塞 I/O,如 这个答案。然而,在 Windows 上,您将无法采用这种方法。有关 Python 中异步 I/O 的更多信息以及一些替代方案,请参见 此视频

  • 线程:处理此问题的另一种常见方法是为要实时读取的每个文件描述符创建一个线程。线程只处理它们所分配的文件描述符,因此阻塞 I/O 不会造成损害。

Stdin, stdout and stderr of a process need to be real file descriptors. (That is actually not a restriction imposed by Python, but rather how pipes work on the OS level.) So you will need a different solution.

If you want to track both stdout an stderr in real time, you will need asynchronous I/O or threads.

  • Asynchronous I/O: With the standard synchronous (=blocking) I/O, a read to one of the streams could block, disallowing access to the other one in real time. If you are on Unix, you can use non-blocking I/O as described in this answer. However, on Windows you will be out of luck with this approach. More on asynchronous I/O in Python and some alternatives are shown in this video.

  • Threads: Another common way to deal with this problem is to create one thread for each file descriptor you want to read from in real time. The threads only handle the file descriptor they are assinged to, so blocking I/O won't harm.

美煞众生 2024-10-12 14:32:30

请查看此处

p = subprocess.Popen(cmd,
                 shell=True,
                 bufsize=64,
                 stdin=subprocess.PIPE,
                 stderr=subprocess.PIPE,
                 stdout=subprocess.PIPE)

Have a look here.

p = subprocess.Popen(cmd,
                 shell=True,
                 bufsize=64,
                 stdin=subprocess.PIPE,
                 stderr=subprocess.PIPE,
                 stdout=subprocess.PIPE)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文