块 - 将输入发送到 python 子进程管道

发布于 2024-08-08 13:07:47 字数 4892 浏览 3 评论 0原文

我正在用 python 测试子流程管道。我知道我可以直接在 python 中执行下面的程序所做的事情,但这不是重点。我只是想测试管道,以便知道如何使用它。

我的系统是 Linux Ubuntu 9.04,默认 python 2.6。

我从这个文档示例开始。

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

这是可行的,但由于 p1stdin 没有被重定向,我必须在终端中输入内容来输入管道。当我输入 ^D 关闭标准输入时,我得到了我想要的输出。

但是,我想使用 python 字符串变量将数据发送到管道。首先我尝试在标准输入上写入:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

没有用。我尝试在最后一行使用 p2.stdout.read() 代替,但它也会阻塞。我添加了 p1.stdin.flush()p1.stdin.close() 但它也不起作用。我然后我就开始沟通:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

原来如此,还不行。

我注意到运行单个进程(如上面的 p1,删除 p2)效果很好。将文件句柄传递给 p1 (stdin=open(...)) 也可以。所以问题是:

是否可以在Python中将数据传递到2个或更多子进程的管道而不阻塞?为什么不呢?

我知道我可以运行 shell 并在 shell 中运行管道,但这不是我想要的。


更新 1:按照下面 Aaron Digulla 的提示,我现在尝试使用线程使其工作。

首先我尝试在线程上运行 p1.communicate 。

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

好吧,没成功。尝试了其他组合,例如将其更改为 .write() 以及 p2.read()。没有什么。现在让我们尝试相反的方法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

代码最终在某个地方阻塞。无论是在生成的线程中,还是在主线程中,或两者兼而有之。所以没有成功。如果您知道如何使其工作,那么如果您可以提供工作代码,那就会更容易。我正在这里尝试。


更新2

Paul Du Bois 在下面回答了一些信息,所以我做了更多测试。 我已经阅读了整个 subprocess.py 模块并了解了它的工作原理。所以我尝试将其应用到代码中。

我在 Linux 上,但由于我正在使用线程进行测试,我的第一个方法是复制 subprocess.pycommunicate() 方法中看到的确切 Windows 线程代码,但是对于两个进程而不是一个进程。这是我尝试过的完整列表:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

嗯。它不起作用。即使在调用 p1.stdin.close() 后,p2.stdout.read() 仍然会阻塞。

然后我在 subprocess.py 上尝试了 posix 代码:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

也在 select.select() 上阻塞。通过传播print,我发现:

  • 阅读是有效的。代码在执行过程中会被多次读取。
  • 写作也是工作。数据写入p1.stdin
  • numwrites 末尾,调用 p1.stdin.close()
  • select()开始阻塞时,只有to_read有东西,p2.stdoutto_write 已经为空。
  • os.read() 调用总是返回一些内容,因此永远不会调用 p2.stdout.close() 。

两个测试的结论:关闭管道上第一个进程的 stdin(示例中的 grep)不会使其转储其缓冲输出到下一个并死去。

没有办法让它发挥作用吗?

PS:我不想使用临时文件,我已经用文件进行了测试,并且我知道它可以工作。而且我不想使用Windows。

I'm testing subprocesses pipelines with python. I'm aware that I can do what the programs below do in python directly, but that's not the point. I just want to test the pipeline so I know how to use it.

My system is Linux Ubuntu 9.04 with default python 2.6.

I started with this documentation example.

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

That works, but since p1's stdin is not being redirected, I have to type stuff in the terminal to feed the pipe. When I type ^D closing stdin, I get the output I want.

However, I want to send data to the pipe using a python string variable. First I tried writing on stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

Didn't work. I tried using p2.stdout.read() instead on last line, but it also blocks. I added p1.stdin.flush() and p1.stdin.close() but it didn't work either. I Then I moved to communicate:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

So that's still not it.

I noticed that running a single process (like p1 above, removing p2) works perfectly. And passing a file handle to p1 (stdin=open(...)) also works. So the problem is:

Is it possible to pass data to a pipeline of 2 or more subprocesses in python, without blocking? Why not?

I'm aware I could run a shell and run the pipeline in the shell, but that's not what I want.


UPDATE 1: Following Aaron Digulla's hint below I'm now trying to use threads to make it work.

First I've tried running p1.communicate on a thread.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

Okay, didn't work. Tried other combinations like changing it to .write() and also p2.read(). Nothing. Now let's try the opposite approach:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

code ends up blocking somewhere. Either in the spawned thread, or in the main thread, or both. So it didn't work. If you know how to make it work it would make easier if you can provide working code. I'm trying here.


UPDATE 2

Paul Du Bois answered below with some information, so I did more tests.
I've read entire subprocess.py module and got how it works. So I tried applying exactly that to code.

I'm on linux, but since I was testing with threads, my first approach was to replicate the exact windows threading code seen on subprocess.py's communicate() method, but for two processes instead of one. Here's the entire listing of what I tried:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

Well. It didn't work. Even after p1.stdin.close() was called, p2.stdout.read() still blocks.

Then I tried the posix code on subprocess.py:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Also blocks on select.select(). By spreading prints around, I found out this:

  • Reading is working. Code reads many times during execution.
  • Writing is also working. Data is written to p1.stdin.
  • At the end of numwrites, p1.stdin.close() is called.
  • When select() starts blocking, only to_read has something, p2.stdout. to_write is already empty.
  • os.read() call always returns something, so p2.stdout.close() is never called.

Conclusion from both tests: Closing the stdin of the first process on the pipeline (grep in the example) is not making it dump its buffered output to the next and die.

No way to make it work?

PS: I don't want to use a temporary file, I've already tested with files and I know it works. And I don't want to use windows.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(11

江湖彼岸 2024-08-15 13:07:47

我知道怎么做了。

它与线程无关,也与 select() 无关。

当我运行第一个进程 (grep) 时,它会创建两个低级文件描述符,每个管道一个。我们将它们称为ab

当我运行第二个进程时,b 被传递给 cut sdtin。但是 Popen 有一个脑死亡的默认值 - close_fds=False

其效果是 cut 也继承了 a。因此,即使我关闭 agrep 也不会死掉,因为 stdin 在 cut 的进程中仍然处于打开状态(cut code> 忽略它)。

下面的代码现在可以完美运行。

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True 应该是 unix 系统上的默认值。在 Windows 上,它会关闭所有 fd,因此可以防止管道传输。

编辑:

PS:对于阅读此答案有类似问题的人:正如 Pooryorick 在评论中所说,如果写入 p1.stdin 的数据大于缓冲区,也可能会阻塞。在这种情况下,您应该将数据分成更小的部分,并使用 select.select() 来知道何时读/写。问题中的代码应该给出如何实现它的提示。

EDIT2:找到了另一种解决方案,在 Pooryorick 的更多帮助下 - 可以关闭 fd,而不是使用 close_fds=True 并关闭 ALL fd属于第一个进程,当执行第二个进程时,它将起作用。关闭必须在子进程中完成,因此 Popen 中的 preexec_fn 函数可以非常方便地完成此操作。执行 p2 时,您可以执行以下操作:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

I found out how to do it.

It is not about threads, and not about select().

When I run the first process (grep), it creates two low-level file descriptors, one for each pipe. Lets call those a and b.

When I run the second process, b gets passed to cut sdtin. But there is a brain-dead default on Popen - close_fds=False.

The effect of that is that cut also inherits a. So grep can't die even if I close a, because stdin is still open on cut's process (cut ignores it).

The following code now runs perfectly.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.

EDIT:

PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select() to know when to read/write. The code in the question should give a hint on how to implement that.

EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True and close ALL fds, one could close the fds that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn function from Popen comes very handy to do just that. On executing p2 you can do:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
忆梦 2024-08-15 13:07:47

处理大文件

在 Python 中处理大文件时需要统一应用两个原则。

  1. 由于任何 IO 例程都可能阻塞,因此我们必须将管道的每个阶段保持在不同的线程或进程中。我们在此示例中使用线程,但子进程可以让您避免 GIL。
  2. 我们必须使用增量读取和写入,这样我们就不会等到EOF才开始取得进展。

另一种方法是使用非阻塞 IO,尽管这在标准 Python 中很麻烦。请参阅 gevent 了解使用非阻塞原语实现同步 IO API 的轻量级线程库。

示例代码

我们将构建一个愚蠢的管道,大致是

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

大括号 {} 中的每个阶段都用 Python 实现,而其他阶段则使用标准外部程序。 TL;DR: 查看此要点

我们从预期进口开始。

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

管道的 Python 阶段

除了管道的最后一个 Python 实现的阶段之外,所有阶段都需要进入一个线程,这样它的 IO 就不会阻塞其他阶段。如果您希望它们实际并行运行(避免 GIL),它们可以在 Python 子进程中运行。

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

其中每一个都需要放入自己的线程中,我们将使用这个便利函数来完成此操作。

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

创建管道

使用 Popen 创建外部阶段,并使用 spawn 创建 Python 阶段。参数 bufsize=-1 表示使用系统默认缓冲(通常为 4 kiB)。这通常比默认(无缓冲)或行缓冲更快,但如果您想直观地监视输出而没有滞后,则需要行缓冲。

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

驱动管道

如上组装,管道中的所有缓冲区都将填满,但由于没有人从末尾读取 (grepz.stdout),因此它们都会阻塞。我们可以通过一次调用 grepz.stdout.read() 来读取整个内容,但这会占用大文件的大量内存。相反,我们增量阅读

for line in grepz.stdout:
    sys.stdout.write(line.lower())

线程和进程在到达 EOF 后就会进行清理。 显式清理。

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

我们可以使用Python-2.6及更早版本

在内部,subprocess.Popen调用fork,配置管道文件描述符,并调用exec。来自fork的子进程拥有父进程中所有文件描述符的副本,并且两个副本都需要在相应的读取器获得EOF.这可以通过手动关闭管道(通过 close_fds=Truesubprocess.Popen 的合适的 preexec_fn 参数)或通过设置FD_CLOEXEC 标志让 exec 自动关闭文件描述符。此标志在 Python-2.7 及更高版本中自动设置,请参阅 issue12786。 进行调用,我们可以在早期版本的 Python 中获得 Python-2.7 行为。

p._set_cloexec_flags(p.stdin)

通过在将 p.stdin 作为参数传递给后续 subprocess.Popen 之前

Working with large files

Two principles need to be applied uniformly when working with large files in Python.

  1. Since any IO routine can block, we must keep each stage of the pipeline in a different thread or process. We use threads in this example, but subprocesses would let you avoid the GIL.
  2. We must use incremental reads and writes so that we don't wait for EOF before starting to make progress.

An alternative is to use nonblocking IO, though this is cumbersome in standard Python. See gevent for a lightweight threading library that implements the synchronous IO API using nonblocking primitives.

Example code

We'll construct a silly pipeline that is roughly

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

where each stage in braces {} is implemented in Python while the others use standard external programs. TL;DR: See this gist.

We start with the expected imports.

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

Python stages of the pipeline

All but the last Python-implemented stage of the pipeline needs to go in a thread so that it's IO does not block the others. These could instead run in Python subprocesses if you wanted them to actually run in parallel (avoid the GIL).

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

Each of these needs to be put in its own thread, which we'll do using this convenience function.

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

Create the pipeline

Create the external stages using Popen and the Python stages using spawn. The argument bufsize=-1 says to use the system default buffering (usually 4 kiB). This is generally faster than the default (unbuffered) or line buffering, but you'll want line buffering if you want to visually monitor the output without lags.

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Drive the pipeline

Assembled as above, all the buffers in the pipeline will fill up, but since nobody is reading from the end (grepz.stdout), they will all block. We could read the entire thing in one call to grepz.stdout.read(), but that would use a lot of memory for large files. Instead, we read incrementally.

for line in grepz.stdout:
    sys.stdout.write(line.lower())

The threads and processes clean up once they reach EOF. We can explicitly clean up using

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 and earlier

Internally, subprocess.Popen calls fork, configures the pipe file descriptors, and calls exec. The child process from fork has copies of all file descriptors in the parent process, and both copies will need to be closed before the corresponding reader will get EOF. This can be fixed by manually closing the pipes (either by close_fds=True or a suitable preexec_fn argument to subprocess.Popen) or by setting the FD_CLOEXEC flag to have exec automatically close the file descriptor. This flag is set automatically in Python-2.7 and later, see issue12786. We can get the Python-2.7 behavior in earlier versions of Python by calling

p._set_cloexec_flags(p.stdin)

before passing p.stdin as an argument to a subsequent subprocess.Popen.

江南月 2024-08-15 13:07:47

使管道按预期工作有三个主要技巧

  1. 确保管道的每一端都在不同的线程/进程中使用
    (顶部附近的一些示例遇到此问题)。

  2. 显式关闭每个进程中管道的未使用端

  3. 通过禁用缓冲(Python -u 选项)来处理缓冲,使用
    pty 的,或者只是用不会影响的东西填充缓冲区
    数据,(可能是“\n”,但无论合适)。

Python“管道”模块(我是作者)中的示例适合您的场景
准确地说,并使低级步骤相当清晰。

http://pypi.python.org/pypi/pipeline/

最近,我使用子流程模块作为
生产者-处理器-消费者-控制器模式:

http://www.darkarchive.org/w/Pub /PythonInteract

此示例处理缓冲的 stdin,而不使用 pty,并且
还说明了哪些管道端部应在何处封闭。我更喜欢流程
线程,但原理是一样的。此外,它还说明了
同步向生产者提供数据并从消费者收集输出的队列,
以及如何彻底关闭它们(注意插入到的哨兵
队列)。这种模式允许根据最近的输出生成新的输入,
允许递归发现和处理。

There are three main tricks to making pipes work as expected

  1. Make sure each end of the pipe is used in a different thread/process
    (some of the examples near the top suffer from this problem).

  2. explicitly close the unused end of the pipe in each process

  3. deal with buffering by either disabling it (Python -u option), using
    pty's, or simply filling up the buffer with something that won't affect the
    data, ( maybe '\n', but whatever fits).

The examples in the Python "pipeline" module (I'm the author) fit your scenario
exactly, and make the low-level steps fairly clear.

http://pypi.python.org/pypi/pipeline/

More recently, I used the subprocess module as part of a
producer-processor-consumer-controller pattern:

http://www.darkarchive.org/w/Pub/PythonInteract

This example deals with buffered stdin without resorting to using a pty, and
also illustrates which pipe ends should be closed where. I prefer processes to
threading, but the principle is the same. Additionally, it illustrates
synchronizing Queues to which feed the producer and collect output from the consumer,
and how to shut them down cleanly (look out for the sentinels inserted into the
queues). This pattern allows new input to be generated based on recent output,
allowing for recursive discovery and processing.

唯憾梦倾城 2024-08-15 13:07:47

如果太多数据写入管道的接收端,Nosklo 提供的解决方案将很快崩溃:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

如果此脚本没有挂在您的计算机上,只需将“20000”增加到超出操作系统管道缓冲区大小的值即可。

这是因为操作系统正在缓冲“grep”的输入,但是一旦该缓冲区已满,p1.stdin.write 调用将阻塞,直到从 p2.stdout 读取数据。代码>.在玩具场景中,您可以在同一进程中向管道写入/读取,但在正常使用中,有必要从一个线程/进程写入并从单独的线程/进程读取。对于 subprocess.popen、os.pipe、os.popen* 等来说都是如此。

另一个问题是,有时您希望继续向管道提供从同一管道的早期输出生成的项目。解决方案是让pipe feeder和pipe reader都与man程序异步,并实现两个队列:一个在主程序和pipe feeder之间,一个在主程序和pipe reader之间。 PythonInteract 就是一个例子。

Subprocess 是一个很好的便利模型,但因为它隐藏了 os.popen 和 os.fork 调用的细节,所以有时它比它使用的较低级别的调用更难处理。因此,子进程并不是了解进程间管道真正工作原理的好方法。

Nosklo's offered solution will quickly break if too much data is written to the receiving end of the pipe:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

If this script doesn't hang on your machine, just increase "20000" to something that exceeds the size of your operating system's pipe buffers.

This is because the operating system is buffering the input to "grep", but once that buffer is full, the p1.stdin.write call will block until something reads from p2.stdout. In toy scenarios, you can get way with writing to/reading from a pipe in the same process, but in normal usage, it is necessary to write from one thread/process and read from a separate thread/process. This is true for subprocess.popen, os.pipe, os.popen*, etc.

Another twist is that sometimes you want to keep feeding the pipe with items generated from earlier output of the same pipe. The solution is to make both the pipe feeder and the pipe reader asynchronous to the man program, and implement two queues: one between the main program and the pipe feeder and one between the main program and the pipe reader. PythonInteract is an example of that.

Subprocess is a nice convenience model, but because it hides the details of the os.popen and os.fork calls it does under the hood, it can sometimes be more difficult to deal with than the lower-level calls it utilizes. For this reason, subprocess is not a good way to learn about how inter-process pipes really work.

幸福%小乖 2024-08-15 13:07:47

您必须在多个线程中执行此操作。否则,您最终会遇到无法发送数据的情况:子 p1 不会读取您的输入,因为 p2 不会读取 p1 的输出,因为您没有读取 p2 的输出。

因此,您需要一个后台线程来读取 p2 写出的内容。这将允许 p2 在将一些数据写入管道后继续,因此它可以从 p1 读取下一行输入,这再次允许 p1 处理您发送给它的数据。

或者,您可以使用后台线程将数据发送到 p1,并在主线程中读取 p2 的输出。但任何一方都必须是线程。

You must do this in several threads. Otherwise, you'll end up in a situation where you can't send data: child p1 won't read your input since p2 doesn't read p1's output because you don't read p2's output.

So you need a background thread that reads what p2 writes out. That will allow p2 to continue after writing some data to the pipe, so it can read the next line of input from p1 which again allows p1 to process the data which you send to it.

Alternatively, you can send the data to p1 with a background thread and read the output from p2 in the main thread. But either side must be a thread.

装纯掩盖桑 2024-08-15 13:07:47

回应 nosklo 的断言(请参阅此问题的其他评论),即没有 close_fds=True 就无法完成:

仅当您离开其他时才需要 close_fds=True文件
描述符打开。当打开多个子进程时,最好
跟踪可能被继承的打开文件,并显式关闭任何
不需要的:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds 默认为 False 因为子进程
更喜欢相信调用程序知道它对打开的文件做了什么
描述符,只需为调用者提供一个简单的选项来关闭它们
如果这就是它想要做的。

但真正的问题是管道缓冲区会咬住你除了玩具示例之外的所有内容。
正如我在这个问题的其他答案中所说,经验法则是
不要让您的读者和作者在同一进程/线程中打开。任何人
谁想要使用子进程模块进行双向通信将是
首先研究 os.pipe 和 os.fork 很有用。他们其实不是那样的
如果您有好的示例可供查看,则很难使用。

Responding to nosklo's assertion (see other comments to this question) that it can't be done without close_fds=True:

close_fds=True is only necessary if you've left other file
descriptors open. When opening multiple child processes, it's always good to
keep track of open files that might get inherited, and to explicitly close any
that aren't needed:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds defaults to False because subprocess
prefers to trust the calling program to know what it's doing with open file
descriptors, and just provide the caller with an easy option to close them all
if that's what it wants to do.

But the real issue is that pipe buffers will bite you for all but toy examples.
As I have said in my other answers to this question, the rule of thumb is to
not have your reader and your writer open in the same process/thread. Anyone
who wants to use the subprocess module for two-way communication would be
well-served to study os.pipe and os.fork, first. They're actually not that
hard to use if you have a good example to look at.

随梦而飞# 2024-08-15 13:07:47

我认为您可能正在检查错误的问题。当然,正如 Aaron 所说,如果您尝试既成为管道开始的生产者,又成为管道末端的消费者,很容易陷入僵局。这就是communication()解决的问题。

Communication() 对您来说并不完全正确,因为 stdin 和 stdout 位于不同的子进程对象上;但如果你看一下 subprocess.py 中的实现,你会发现它完全按照 Aaron 的建议进行。

一旦您看到通信读取和写入,您将看到在您的第二次尝试中,communicate() 与 p2 竞争 p1 的输出:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

我在 win32 上运行,它肯定具有不同的 i/o 和缓冲特性,但是这个对我有用:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()

我调整了输入大小,以在使用天真的无线程 p2.read() 时产生死锁。

您也可以尝试缓冲到文件中,例如,

fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()

这也适用于我,没有死锁。

I think you may be examining the wrong problem. Certainly as Aaron says if you try to be both a producer to the beginning of a pipeline, and a consumer of the end of the pipeline, it is easy to get into a deadlock situation. This is the problem that communicate() solves.

communicate() isn't exactly correct for you since stdin and stdout are on different subprocess objects; but if you take a look at the implementation in subprocess.py you'll see that it does exactly what Aaron suggested.

Once you see that communicate both reads and writes, you'll see that in your second try communicate() competes with p2 for the output of p1:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

I am running on win32, which definitely has different i/o and buffering characteristics, but this works for me:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()

I tuned the input size to produce a deadlock when using a naive unthreaded p2.read()

You might also try buffering into a file, eg

fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()

That also works for me without deadlocks.

左秋 2024-08-15 13:07:47

在上面的一条评论中,我向 nosklo 提出挑战,要么发布一些代码来支持他关于 select.select 的断言,要么对他之前否决的我的回复投赞成票。他用以下代码进行了回应:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

该脚本的一个问题是它会事后猜测该脚本的大小/性质。
系统管道缓冲区。如果脚本可以删除,那么它会遇到更少的失败
像 1024 这样的幻数。

最大的问题是这个脚本代码只能与正确的一致
数据输入和外部程序的组合。 grep 和 cut 都可以使用
线,因此它们的内部缓冲区的行为有点不同。如果我们使用一个
更通用的命令,如“cat”,并将较小的数据写入管道,
致命的竞争条件会更频繁地出现:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

在这种情况下,将出现两种不同的结果:

write, write, close file, read -> success
write, read -> hang

因此,我再次向 nosklo 提出挑战,要求显示使用
select.select 处理任意输入和管道缓冲
单线程,或者对我的回复进行投票。

底线:不要尝试从单个线程操作管道的两端。
这根本不值得。看
管道 一个不错的低级
如何正确执行此操作的示例。

In one of the comments above, I challenged nosklo to either post some code to back up his assertions about select.select or to upvote my responses he had previously down-voted. He responded with the following code:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

One problem with this script is that it second-guesses the size/nature of the
system pipe buffers. The script would experience fewer failures if it could remove
magic numbers like 1024.

The big problem is that this script code only works consistently with the right
combination of data input and external programs. grep and cut both work with
lines, and so their internal buffers behave a bit differently. If we use a
more generic command like "cat", and write smaller bits of data into the pipe,
the fatal race condition will pop up more often:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

In this case, two different results will manifest:

write, write, close file, read -> success
write, read -> hang

So again, I challenge nosklo to either post code showing the use of
select.select to handle arbitrary input and pipe buffering from a
single thread, or to upvote my responses.

Bottom line: don't try to manipulate both ends of a pipe from a single thread.
It's just not worth it. See
pipeline for a nice low-level
example of how to do this correctly.

御弟哥哥 2024-08-15 13:07:47

使用 SpooledTemporaryFile 怎么样?这绕过了(但也许没有解决)问题:

http://docs .python.org/library/tempfile.html#tempfile.SpooledTemporaryFile

你可以像文件一样写入它,但它实际上是一个内存块。

还是我完全误会了...

What about using a SpooledTemporaryFile ? This bypasses (but perhaps doesn't solve) the issue:

http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile

You can write to it like a file, but it's actually a memory block.

Or am I totally misunderstanding...

浸婚纱 2024-08-15 13:07:47

下面是使用 Popen 和 os.fork 来完成相同任务的示例
事物。它不使用 close_fds 它只是关闭管道
正确的地方。比尝试使用 select.select 简单得多,并且
充分利用系统管道缓冲区。

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()

Here's an example of using Popen together with os.fork to accomplish the same
thing. Instead of using close_fds it just closes the pipes at the
right places. Much simpler than trying to use select.select, and
takes full advantage of system pipe buffers.

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()
离线来电— 2024-08-15 13:07:47

这比您想象的要简单得多!

import sys
from subprocess import Popen, PIPE

# Pipe the command here. It will read from stdin.
#   So cat a file, to stdin, like (cat myfile | ./this.py),
#     or type on terminal and hit control+d when done, etc
#   No need to handle this yourself, that's why we have shell's!
p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE)

nextData = None
while True:
    nextData = p.stdout.read()
    if nextData in (b'', ''):
        break
    sys.stdout.write ( nextData.decode('utf-8') )


p.wait()

此代码是为 python 3.6 编写的,适用于 python 2.7。

像这样使用它:

cat README.md  | python ./example.py

python example.py < README.md

将“README.md”的内容通过管道传输到该程序。

但是..此时,为什么不直接使用“cat”,并按照您想要的方式通过管道输出呢?就像:

cat filename | grep -v not | cut -c 1-10

在控制台中输入也可以完成这项工作。我个人只会在进一步处理输出时才使用 code 选项,否则 shell 脚本会更容易维护和保留。

您只需使用外壳为您完成管道操作即可。一进一出。这就是她擅长做的事情,管理流程以及管理单宽输入和输出链。有些人将其称为 shell 最好的非交互功能。

It's much simpler than you think!

import sys
from subprocess import Popen, PIPE

# Pipe the command here. It will read from stdin.
#   So cat a file, to stdin, like (cat myfile | ./this.py),
#     or type on terminal and hit control+d when done, etc
#   No need to handle this yourself, that's why we have shell's!
p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE)

nextData = None
while True:
    nextData = p.stdout.read()
    if nextData in (b'', ''):
        break
    sys.stdout.write ( nextData.decode('utf-8') )


p.wait()

This code is written for python 3.6, and works with python 2.7.

Use it like:

cat README.md  | python ./example.py

or

python example.py < README.md

To pipe the contents of "README.md" to this program.

But.. at this point, why not just use "cat" directly, and pipe the output like you want? like:

cat filename | grep -v not | cut -c 1-10

typed into the console will do the job as well. I personally would only use the code option if I was further processing the output, otherwise a shell script would be easier to maintain and be retained.

You just, use the shell to do the piping for you. In one, out the other. That's what she'll are GREAT at doing, managing processes, and managing single-width chains of input and output. Some would call it a shell's best non-interactive feature..

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文