如何为 Popen 的标准输入连接多个文件

发布于 2024-11-07 14:52:30 字数 1003 浏览 0 评论 0原文

我正在将 bash 脚本移植到 python 2.6,并且想要替换一些代码:

cat $( ls -tr xyz_`date +%F`_*.log ) | filter args > bzip2

我想我想要类似于 http://docs.python.org/release/2.6/library/subprocess.html,唉...

p1 = Popen(["filter", "args"], stdin=*?WHAT?*, stdout=PIPE)
p2 = Popen(["bzip2"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]

但是,我不确定如何最好地提供 p1stdin 值,以便它连接输入文件。似乎我可以添加......

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = ... stdin=p0.stdout ...

但这似乎超出了使用(缓慢、低效)管道来调用具有重要功能的外部程序的范围。 (任何像样的 shell 都会在内部执行 cat。)

因此,我可以想象一个满足文件对象 API 要求的自定义类,因此可以用于 p1 的 stdin,连接任意其他文件对象。 (编辑:现有答案解释了为什么这是不可能的

Python 2.6 是否有解决此需求/想要的机制,或者可能另一个 Popencat 在 python 圈子里被认为是完美的吗?

谢谢。

I'm porting a bash script to python 2.6, and want to replace some code:

cat $( ls -tr xyz_`date +%F`_*.log ) | filter args > bzip2

I guess I want something similar to the "Replacing shell pipe line" example at http://docs.python.org/release/2.6/library/subprocess.html, ala...

p1 = Popen(["filter", "args"], stdin=*?WHAT?*, stdout=PIPE)
p2 = Popen(["bzip2"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]

But, I'm not sure how best to provide p1's stdin value so it concatenates the input files. Seems I could add...

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = ... stdin=p0.stdout ...

...but that seems to be crossing beyond use of (slow, inefficient) pipes to call external programs with significant functionality. (Any decent shell performs the cat internally.)

So, I can imagine a custom class that satisfies the file object API requirements and can therefore be used for p1's stdin, concatenating arbitrary other file objects. (EDIT: existing answers explain why this isn't possible)

Does python 2.6 have a mechanism addressing this need/want, or might another Popen to cat be considered perfectly fine in python circles?

Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

情深缘浅 2024-11-14 14:52:30

您可以用 Python 代码替换您正在执行的所有操作,但外部实用程序除外。这样,只要您的外部实用程序是可移植的,您的程序就将保持可移植性。您还可以考虑将 C++ 程序转换为库并使用 Cython 与其交互。正如 Messa 所示,datetime.strftime 替换,通配符由 glob.glob 完成,cat 可以替换为读取列表中的所有文件并将它们写入程序的输入。对 bzip2 的调用可以替换为 bz2 模块,但这会使您的程序变得复杂,因为您必须同时读写。为此,如果数据很大,您需要使用 p.communicate 或线程(select.select 会是更好的选择,但它不适用于视窗)。

import sys
import bz2
import glob
import time
import threading
import subprocess

output_filename = '../whatever.bz2'
input_filenames = glob.glob(time.strftime("xyz_%F_*.log"))
p = subprocess.Popen(['filter', 'args'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
output = open(output_filename, 'wb')
output_compressor = bz2.BZ2Compressor()

def data_reader():
    for filename in input_filenames:
        f = open(filename, 'rb')
        p.stdin.writelines(iter(lambda: f.read(8192), ''))
    p.stdin.close()

input_thread = threading.Thread(target=data_reader)
input_thread.start()

with output:
    for chunk in iter(lambda: p.stdout.read(8192), ''):
        output.write(output_compressor.compress(chunk))

    output.write(output_compressor.flush())

input_thread.join()
p.wait()

添加:如何检测文件输入类型

您可以使用文件扩展名或 libmagic 的 Python 绑定来检测文件的压缩方式。下面的代码示例同时执行这两项操作,并自动选择 magic(如果可用)。您可以选择适合您需要的部分并根据您的需要进行调整。 open_autodecompress 应该检测 mime 编码并使用适当的解压缩器(如果可用)打开文件。

import os
import gzip
import bz2
try:
    import magic
except ImportError:
    has_magic = False
else:
    has_magic = True


mime_openers = {
    'application/x-bzip2': bz2.BZ2File,
    'application/x-gzip': gzip.GzipFile,
}

ext_openers = {
    '.bz2': bz2.BZ2File,
    '.gz': gzip.GzipFile,
}


def open_autodecompress(filename, mode='r'):
    if has_magic:
        ms = magic.open(magic.MAGIC_MIME_TYPE)
        ms.load()
        mimetype = ms.file(filename)
        opener = mime_openers.get(mimetype, open)
    else:
        basepart, ext = os.path.splitext(filename)
        opener = ext_openers.get(ext, open)
    return opener(filename, mode)

You can replace everything that you're doing with Python code, except for your external utility. That way your program will remain portable as long as your external util is portable. You can also consider turning the C++ program into a library and using Cython to interface with it. As Messa showed, date is replaced with time.strftime, globbing is done with glob.glob and cat can be replaced with reading all the files in the list and writing them to the input of your program. The call to bzip2 can be replaced with the bz2 module, but that will complicate your program because you'd have to read and write simultaneously. To do that, you need to either use p.communicate or a thread if the data is huge (select.select would be a better choice but it won't work on Windows).

import sys
import bz2
import glob
import time
import threading
import subprocess

output_filename = '../whatever.bz2'
input_filenames = glob.glob(time.strftime("xyz_%F_*.log"))
p = subprocess.Popen(['filter', 'args'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
output = open(output_filename, 'wb')
output_compressor = bz2.BZ2Compressor()

def data_reader():
    for filename in input_filenames:
        f = open(filename, 'rb')
        p.stdin.writelines(iter(lambda: f.read(8192), ''))
    p.stdin.close()

input_thread = threading.Thread(target=data_reader)
input_thread.start()

with output:
    for chunk in iter(lambda: p.stdout.read(8192), ''):
        output.write(output_compressor.compress(chunk))

    output.write(output_compressor.flush())

input_thread.join()
p.wait()

Addition: How to detect file input type

You can use either the file extension or the Python bindings for libmagic to detect how the file is compressed. Here's a code example that does both, and automatically chooses magic if it is available. You can take the part that suits your needs and adapt it to your needs. The open_autodecompress should detect the mime encoding and open the file with the appropriate decompressor if it is available.

import os
import gzip
import bz2
try:
    import magic
except ImportError:
    has_magic = False
else:
    has_magic = True


mime_openers = {
    'application/x-bzip2': bz2.BZ2File,
    'application/x-gzip': gzip.GzipFile,
}

ext_openers = {
    '.bz2': bz2.BZ2File,
    '.gz': gzip.GzipFile,
}


def open_autodecompress(filename, mode='r'):
    if has_magic:
        ms = magic.open(magic.MAGIC_MIME_TYPE)
        ms.load()
        mimetype = ms.file(filename)
        opener = mime_openers.get(mimetype, open)
    else:
        basepart, ext = os.path.splitext(filename)
        opener = ext_openers.get(ext, open)
    return opener(filename, mode)
心如狂蝶 2024-11-14 14:52:30

如果您查看 subprocess 模块实现的内部,您将看到 std{in,out,err} 预计是支持 fileno() 方法的文件对象,因此一个简单的串联具有 python 接口的类文件对象(甚至 StringIO 对象)在这里不适合。

如果它是迭代器,而不是文件对象,您可以使用itertools.chain。

当然,牺牲内存消耗你可以这样做:

import itertools, os

# ...

files = [f for f in os.listdir(".") if os.path.isfile(f)]
input = ''.join(itertools.chain(open(file) for file in files))
p2.communicate(input)

If you look inside the subprocess module implementation, you will see that std{in,out,err} are expected to be fileobjects supporting fileno() method, so a simple concatinating file-like object with python interface (or even a StringIO object) is not suitable here.

If it were iterators, not file objects, you could use itertools.chain.

Of course, sacrificing the memory consumption you can do something like this:

import itertools, os

# ...

files = [f for f in os.listdir(".") if os.path.isfile(f)]
input = ''.join(itertools.chain(open(file) for file in files))
p2.communicate(input)
谜泪 2024-11-14 14:52:30

使用子进程时,您必须考虑这样一个事实:Popen 在内部将使用文件描述符(处理程序)并为 stdin、stdout 和 stderr 调用 os.dup2(),然后再将它们传递给创建的子进程。

因此,如果您不想将系统 shell 管道与 Popen 一起使用:

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = Popen(["filter", "args"], stdin=p0.stdout, stdout=PIPE)

...

我认为您的另一个选择是在 python 中编写一个 cat 函数,并以类似 cat 的方式生成一个 文件 并传递此 file 到 p1 stdin,不要考虑实现 io API 的类,因为它不会像我所说的那样工作,因为子进程在内部只会获取文件描述符。

话虽如此,我认为你更好的选择是使用 unix PIPE 方式,如 subprocess文档

When using subprocess you have to consider the fact that internally Popen will use the file descriptor(handler) and call os.dup2() for stdin, stdout and stderr before passing them to the child process created.

So if you don't want to use system shell pipe with Popen:

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = Popen(["filter", "args"], stdin=p0.stdout, stdout=PIPE)

...

I think your other option is to write a cat function in python and generate a file in cat-like way and pass this file to p1 stdin, don't think about a class that implement the io API because it will not work as i said because internally the child process will just get the file descriptors.

With that said i think your better option is to use unix PIPE way like in subprocess doc.

活泼老夫 2024-11-14 14:52:30

这应该很容易。首先,使用 os.pipe 创建一个管道,然后使用 Popen将管道的读取端作为标准输入的过滤器。然后,对于目录中名称与模式匹配的每个文件,只需将其内容传递到管道的写入端。这应该与 shell 命令 cat ..._*.log | 完全相同。 filter args 确实如此。

更新: 抱歉,不需要来自 os.pipe 的管道,我忘记了 subprocess.Popen(..., stdin=subprocess.PIPE) 实际上为您创建了一个。另外,管道中不能填充太多数据,只有在读取先前的数据后才能将更多数据写入管道。

因此解决方案(例如使用 wc -l)是:

import glob
import subprocess

p = subprocess.Popen(["wc", "-l"], stdin=subprocess.PIPE)

processDate = "2011-05-18"  # or time.strftime("%F")
for name in glob.glob("xyz_%s_*.log" % processDate):
    f = open(name, "rb")
    # copy all data from f to p.stdin
    while True:
        data = f.read(8192)
        if not data:
            break  # reached end of file
        p.stdin.write(data)
    f.close()

p.stdin.close()
p.wait()

使用示例:

$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_a.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_b.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_c.log 
$ ./example.py 
   30000

This should be easy. First, create a pipe using os.pipe, then Popen the filter with read end of the pipe as standard input. Then for each file in the directory with name matching the pattern, just pass its contents to the write end of the pipe. This should be exactly the same what the shell command cat ..._*.log | filter args does.

Update: Sorry, pipe from os.pipe is not needed, I forgot that subprocess.Popen(..., stdin=subprocess.PIPE) actualy creates one for you. Also a pipe cannot be stuffed with too much data, more data can be written to a pipe only after the previous data are read.

So the solution (for example with wc -l) would be:

import glob
import subprocess

p = subprocess.Popen(["wc", "-l"], stdin=subprocess.PIPE)

processDate = "2011-05-18"  # or time.strftime("%F")
for name in glob.glob("xyz_%s_*.log" % processDate):
    f = open(name, "rb")
    # copy all data from f to p.stdin
    while True:
        data = f.read(8192)
        if not data:
            break  # reached end of file
        p.stdin.write(data)
    f.close()

p.stdin.close()
p.wait()

Usage example:

$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_a.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_b.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_c.log 
$ ./example.py 
   30000
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文