如何从 Python 迭代器提供子进程的标准输入?

发布于 2024-11-27 03:23:18 字数 951 浏览 0 评论 0原文

我正在尝试使用 Python 中的 subprocess 模块与以流方式读取标准输入并写入标准输出的进程进行通信。我想让子进程从生成输入的迭代器读取行,然后从子进程读取输出行。输入线和输出线之间可能不存在一一对应的关系。如何从返回字符串的任意迭代器提供子进程?

下面是一些示例代码,它提供了一个简单的测试用例,以及我尝试过的一些方法,由于某种原因不起作用:

#!/usr/bin/python
from subprocess import *
# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

# I thought that stdin could be any iterable, but it actually wants a
# filehandle, so this fails with an error.
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE)

# This works, but it first sends *all* the input at once, then returns
# *all* the output as a string, rather than giving me an iterator over
# the output. This uses up all my memory, because the input is several
# hundred million lines.
subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
output, error = subproc.communicate("".join(input_iterator))
output_lines = output.split("\n")

那么,当我从迭代器的 stdout 行读取时,如何让我的子进程逐行读取按线路?

I am trying to use the subprocess module in Python to communicate with a process that reads standard input and writes standard output in a streaming fashion. I want to have the subprocess read lines from an iterator that produces the input, and then read output lines from the subprocess. There may not be a one-to-one correspondence between input and output lines. How can I feed a subprocess from an arbitrary iterator that returns strings?

Here is some example code that gives a simple test case, and some methods I have tried that don't work for some reason or other:

#!/usr/bin/python
from subprocess import *
# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

# I thought that stdin could be any iterable, but it actually wants a
# filehandle, so this fails with an error.
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE)

# This works, but it first sends *all* the input at once, then returns
# *all* the output as a string, rather than giving me an iterator over
# the output. This uses up all my memory, because the input is several
# hundred million lines.
subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
output, error = subproc.communicate("".join(input_iterator))
output_lines = output.split("\n")

So how can I have my subprocess read from an iterator line by line while I read from its stdout line by line?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

好倦 2024-12-04 03:23:18

最简单的方法似乎是从子进程中分叉并提供输入句柄。任何人都可以详细说明这样做的任何可能的缺点吗?或者是否有 python 模块可以使其更简单、更安全?

#!/usr/bin/python
from subprocess import *
import os

def fork_and_input(input, handle):
    """Send input to handle in a child process."""
    # Make sure input is iterable before forking
    input = iter(input)
    if os.fork():
        # Parent
        handle.close()
    else:
        # Child
        try:
            handle.writelines(input)
            handle.close()
        # An IOError here means some *other* part of the program
        # crashed, so don't complain here.
        except IOError:
            pass
        os._exit()

# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
fork_and_input(input_iterator, subproc.stdin)

for line in subproc.stdout:
    print line,

The easy way seems to be to fork and feed the input handle from the child process. Can anyone elaborate on any possible downsides of doing this? Or are there python modules that make it easier and safer?

#!/usr/bin/python
from subprocess import *
import os

def fork_and_input(input, handle):
    """Send input to handle in a child process."""
    # Make sure input is iterable before forking
    input = iter(input)
    if os.fork():
        # Parent
        handle.close()
    else:
        # Child
        try:
            handle.writelines(input)
            handle.close()
        # An IOError here means some *other* part of the program
        # crashed, so don't complain here.
        except IOError:
            pass
        os._exit()

# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
fork_and_input(input_iterator, subproc.stdin)

for line in subproc.stdout:
    print line,
撩人痒 2024-12-04 03:23:18

要从 Python 迭代器提供子进程的标准输入:

#!/usr/bin/env python3 
from subprocess import Popen, PIPE

with Popen("sink", stdin=PIPE, bufsize=-1) as process:
    for chunk in input_iterator:
        process.stdin.write(chunk)

如果您想同时读取输出,那么您需要 线程或 async.io:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def writelines(writer, lines):
    # NOTE: can't use writer.writelines(lines) here because it tries to write
    # all at once
    with closing(writer):
        for line in lines:
            writer.write(line)
            await writer.drain()

async def main():
    input_iterator = (b"hello %d\n" % x for x in range(100000000))
    process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(writelines(process.stdin, input_iterator))
    async for line in process.stdout:
        sys.stdout.buffer.write(line)
    return await process.wait()

if sys.platform == 'win32':
    loop = asyncio.ProactorEventLoop()  # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
with closing(loop):
    sys.exit(loop.run_until_complete(main()))

To feed a subprocess's standard input from a Python iterator:

#!/usr/bin/env python3 
from subprocess import Popen, PIPE

with Popen("sink", stdin=PIPE, bufsize=-1) as process:
    for chunk in input_iterator:
        process.stdin.write(chunk)

If you want to read the output at the same time then you need threads or async.io:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def writelines(writer, lines):
    # NOTE: can't use writer.writelines(lines) here because it tries to write
    # all at once
    with closing(writer):
        for line in lines:
            writer.write(line)
            await writer.drain()

async def main():
    input_iterator = (b"hello %d\n" % x for x in range(100000000))
    process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(writelines(process.stdin, input_iterator))
    async for line in process.stdout:
        sys.stdout.buffer.write(line)
    return await process.wait()

if sys.platform == 'win32':
    loop = asyncio.ProactorEventLoop()  # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
with closing(loop):
    sys.exit(loop.run_until_complete(main()))
暖阳 2024-12-04 03:23:18

按照这个食谱支持异步 I/O 的子流程附加组件。不过,这仍然要求您的子进程使用其输出的一部分来响应每个输入行或一组行。

Follow this recipe It's an add-on to subprocess which supports asyncronous I/O. This still requires that your subprocess respond to each input line or group of lines with a portion of its output, though.

撕心裂肺的伤痛 2024-12-04 03:23:18

https://github.com/uktrade/iterable-subprocess (完整披露:创建由我)可以做到这一点。例如:

from iterable_subprocess import iterable_subprocess

input_iterator = (("hello %s\n" % x).encode("utf-8") for x in range(100000000))

with iterable_subprocess(['cat'], input_iterator) as output:
    for chunk in output:
        print(chunk)

虽然这不会输出字符串行,但会输出字节块,不一定会分成行。要制作可迭代的行,您可以在 https://stackoverflow.com/a/70639580/1319998< /a>

import io
from iterable_subprocess import iterable_subprocess

input_iterator = (("hello %s\n" % x).encode() for x in range(100000000))

class FileLikeObject(io.IOBase):
    def __init__(self, it):
        self.it = iter(it)
    def readable(self):
        return True
    def read(self, _):
        return next(self.it, b'')

with iterable_subprocess(['cat'], input_iterator) as output:
    for line in io.TextIOWrapper(FileLikeObject(output), newline="", encoding="utf-8"):
        print(line)

There is https://github.com/uktrade/iterable-subprocess (full disclosure: created by me) that can do this. For example:

from iterable_subprocess import iterable_subprocess

input_iterator = (("hello %s\n" % x).encode("utf-8") for x in range(100000000))

with iterable_subprocess(['cat'], input_iterator) as output:
    for chunk in output:
        print(chunk)

Although this won't output lines of strings, but chunks of bytes, not necessarily split into lines. To make an iterable of lines, you can integrate a variant of the answer at https://stackoverflow.com/a/70639580/1319998

import io
from iterable_subprocess import iterable_subprocess

input_iterator = (("hello %s\n" % x).encode() for x in range(100000000))

class FileLikeObject(io.IOBase):
    def __init__(self, it):
        self.it = iter(it)
    def readable(self):
        return True
    def read(self, _):
        return next(self.it, b'')

with iterable_subprocess(['cat'], input_iterator) as output:
    for line in io.TextIOWrapper(FileLikeObject(output), newline="", encoding="utf-8"):
        print(line)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文