使用 subprocess.Popen 时将大量数据通过管道传输到 stdin
我有点难以理解解决这个简单问题的 python 方法是什么。
我的问题很简单。如果您使用以下代码,它将挂起。这在子流程模块文档中有详细记录。
import subprocess
proc = subprocess.Popen(['cat','-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
for i in range(100000):
proc.stdin.write('%d\n' % i)
output = proc.communicate()[0]
print output
寻找一个解决方案(有一个非常有洞察力的线程,但我现在失去了它)我发现这个解决方案(除其他外)使用显式分叉:
import os
import sys
from subprocess import Popen, PIPE
def produce(to_sed):
for i in range(100000):
to_sed.write("%d\n" % i)
to_sed.flush()
#this would happen implicitly, anyway, but is here for the example
to_sed.close()
def consume(from_sed):
while 1:
res = from_sed.readline()
if not res:
sys.exit(0)
#sys.exit(proc.poll())
print 'received: ', [res]
def main():
proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE)
to_sed = proc.stdin
from_sed = proc.stdout
pid = os.fork()
if pid == 0 :
from_sed.close()
produce(to_sed)
return
else :
to_sed.close()
consume(from_sed)
if __name__ == '__main__':
main()
虽然这个解决方案在概念上很容易理解,但它使用了一个多进程并且与子流程模块相比,其级别太低(只是为了隐藏此类内容......)。
我想知道:是否有一个简单而干净的解决方案,使用不会挂起的子流程模块,或者要实现这个模式,我必须退一步并实现旧式选择循环或显式分叉?
谢谢
I'm kind of struggling to understand what is the python way of solving this simple problem.
My problem is quite simple. If you use the follwing code it will hang. This is well documented in the subprocess module doc.
import subprocess
proc = subprocess.Popen(['cat','-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
for i in range(100000):
proc.stdin.write('%d\n' % i)
output = proc.communicate()[0]
print output
Searching for a solution (there is a very insightful thread, but I've lost it now) I found this solution (among others) that uses an explicit fork:
import os
import sys
from subprocess import Popen, PIPE
def produce(to_sed):
for i in range(100000):
to_sed.write("%d\n" % i)
to_sed.flush()
#this would happen implicitly, anyway, but is here for the example
to_sed.close()
def consume(from_sed):
while 1:
res = from_sed.readline()
if not res:
sys.exit(0)
#sys.exit(proc.poll())
print 'received: ', [res]
def main():
proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE)
to_sed = proc.stdin
from_sed = proc.stdout
pid = os.fork()
if pid == 0 :
from_sed.close()
produce(to_sed)
return
else :
to_sed.close()
consume(from_sed)
if __name__ == '__main__':
main()
While this solution is conceptually very easy to understand, it uses one more process and stuck as too low level compared to the subprocess module (that is there just to hide this kind of things...).
I'm wondering: is there a simple and clean solution using the subprocess module that won't hung or to implement this patter I have to do a step back and implement an old-style select loop or an explicit fork?
Thanks
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(10)
如果您想要一个纯Python解决方案,则需要将读取器或写入器放在单独的线程中。
threading
包是一种轻量级的方法来实现这一点,可以方便地访问公共对象并且没有混乱的分叉。看到 subprocess 模块经过现代化以支持流和协程可能会很不错,这将允许更优雅地构建混合 Python 片段和 shell 片段的管道。
If you want a pure Python solution, you need to put either the reader or the writer in a separate thread. The
threading
package is a lightweight way to do this, with convenient access to common objects and no messy forking.It might be neat to see the
subprocess
module modernized to support streams and coroutines, which would allow pipelines that mix Python pieces and shell pieces to be constructed more elegantly.如果不想将所有数据保留在内存中,则必须使用 select。例如:
If you don't want to keep all the data in memory, you have to use select. E.g. something like:
一旦
cat
的标准输出操作系统管道缓冲区已满,您的代码就会死锁。如果您使用stdout=PIPE
;你必须及时消耗它,否则可能会发生你的情况的僵局。如果进程运行时不需要输出;您可以将其重定向到临时文件:
如果输入/输出很小(适合内存);您可以使用
.communicate()
一次传递所有输入并同时获取所有输出,它可以为您同时读/写:要手动同时读/写,您可以使用线程、asyncio、fcntl等等。@Jed 提供了一个简单的基于线程的解决方案。这是基于
asyncio
的解决方案:在 Unix 上,您可以使用基于
fcntl
的解决方案:Your code deadlocks as soon as
cat
's stdout OS pipe buffer is full. If you usestdout=PIPE
; you have to consume it in time otherwise the deadlock as in your case may happen.If you don't need the output while the process is running; you could redirect it to a temporary file:
If the input/output are small (fit in memory); you could pass the input all at once and get the output all at once using
.communicate()
that reads/writes concurrently for you:To read/write concurrently manually, you could use threads, asyncio, fcntl, etc. @Jed provided a simple thread-based solution. Here's
asyncio
-based solution:On Unix, you could use
fcntl
-based solution:这是我用来通过子进程加载 6G mysql 转储文件的东西。远离 shell=True。不安全并且从进程中启动浪费资源。
Here's something I used to load 6G mysql dump file loads via subprocess. Stay away from shell=True. Not secure and start out of process wasting resources.
对于这种事情,shell 比 subprocess 工作得更好。
编写非常简单的Python应用程序,从
sys.stdin
读取并写入sys.stdout
。使用 shell 管道将简单的应用程序连接在一起。
如果需要,可以使用
subprocess
启动管道或仅编写一行 shell 脚本。这是非常非常有效的。只要保持非常简单,它也可以移植到所有 Linux(和 Windows)。
For this kind of thing, the shell works a lot better than subprocess.
Write very simple Python apps which read from
sys.stdin
and write tosys.stdout
.Connect the simple apps together using a shell pipeline.
If you want, start the pipeline using
subprocess
or simply write a one-line shell script.This is very, very efficient. It's also portable to all Linux (and Windows) as long as you keep it very simple.
使用 aiofiles 和python 3.5 中的 asyncio:
有点复杂,但只需要 1024 字节内存即可在 stdin 中写入!
结果:
Using the aiofiles & asyncio in python 3.5:
A bit complicated, but you need only 1024 Bytes memory to writing in stdin!
Result:
下面是一个使用管道从 gzip 一次读取一条记录的示例(Python 3):
我知道有一个标准模块,它只是作为一个示例。您可以使用通信方法一次性读取整个输出(如 shell 反引号),但显然您必须注意内存大小。
下面是一个在 Linux 上将记录写入 lp(1) 程序的示例(又是 Python 3):
Here is an example (Python 3) of reading one record at a time from gzip using a pipe:
I know there is a standard module for that, it is just meant as an example. You can read the whole output in one go (like shell back-ticks) using the communicate method, but obviously you hav eto be careful of memory size.
Here is an example (Python 3 again) of writing records to the lp(1) program on Linux:
现在我知道这不会完全满足您的纯粹主义者,因为输入必须适合内存,并且您无法选择与输入输出交互工作,但至少这在您的示例中效果很好。通信方法可以选择将输入作为参数,如果您以这种方式向进程提供输入,它将起作用。
至于更大的问题,你可以继承 Popen,重写 __init__ 来接受类似流的对象作为 stdin、stdout、stderr 的参数,并重写 _communicate 方法(毛茸茸的 for跨平台,您需要执行两次(请参阅 subprocess.py 源代码)以在 stdin 流上调用 read() 并将输出 write() 到 stdout 和 stderr 流。这种方法让我困扰的是,据我所知,它还没有被完成。当以前没有做过明显的事情时,通常有一个原因(它没有按预期工作),但我不明白为什么它不应该,除了你需要流在 Windows 中是线程安全的事实。
Now I know this is not going to satisfy the purist in you completely, as the input will have to fit in memory, and you have no option to work interactively with input-output, but at least this works fine on your example. The communicate method optionally takes the input as an argument, and if you feed your process its input this way, it will work.
As for the larger problem, you can subclass Popen, rewrite
__init__
to accept stream-like objects as arguments to stdin, stdout, stderr, and rewrite the_communicate
method (hairy for crossplatform, you need to do it twice, see the subprocess.py source) to call read() on the stdin stream and write() the output to the stdout and stderr streams. What bothers me about this approach is that as far as I know, it hasn't already been done. When obvious things have not been done before, there's usually a reason (it doesn't work as intended), but I can't see why it shoudn't, apart from the fact you need the streams to be thread-safe in Windows.我能想到的最简单的解决方案:
缓冲版本:
The simplest solution I can think of:
Buffered version:
我正在寻找一个示例代码来增量迭代进程输出,因为该进程消耗来自提供的迭代器的输入(也增量)。基本上:
这里建议的一些解决方案允许使用线程(但并不总是方便)或使用 asyncio(在 Python 2.x 中不可用)来完成此操作。下面是允许执行此操作的工作实现示例。
I was looking for an example code to iterate over process output incrementally as this process consumes its input from provided iterator (incrementally as well). Basically:
Some of solutions suggested here allow to do it with threads (but it's not always convenient) or with asyncio (which is not available in Python 2.x). Below is example of working implementation that allows to do it.