Python:创建类似流式 gzip 的文件?

发布于 2024-08-20 03:01:08 字数 1085 浏览 3 评论 0原文

我正在尝试找出使用 Python 的 zlib 压缩流的最佳方法。

我有一个类似文件的输入流(input,下面)和一个接受类似文件的输出函数(output_function,下面):

with open("file") as input:
    output_function(input)

我想要在将 input 块发送到 output_function 之前对其进行 gzip 压缩:

with open("file") as input:
    output_function(gzip_stream(input))

它看起来像 gzip 模块假设输入或输出将是磁盘上的 gzip 文件...所以我假设 zlib 模块是我想要的。

然而,它本身并没有提供一种简单的方法来创建类似流文件的...并且它支持的流压缩是通过手动将数据添加到压缩缓冲区,然后刷新该缓冲区的方式来实现的。

当然,我可以围绕 zlib.compress.compresszlib.compress.flush 编写一个包装器(compresszlib 返回.compressobj()),但我担心缓冲区大小错误或类似的情况。

那么,使用 Python 创建类似 gzip 压缩的流式文件的最简单方法是什么?

编辑:澄清一下,输入流和压缩输出流都太大,无法容纳在内存中,因此类似 output_function(StringIO(zlib.compress(input.read())) ) 并没有真正解决问题。

I'm trying to figure out the best way to compress a stream with Python's zlib.

I've got a file-like input stream (input, below) and an output function which accepts a file-like (output_function, below):

with open("file") as input:
    output_function(input)

And I'd like to gzip-compress input chunks before sending them to output_function:

with open("file") as input:
    output_function(gzip_stream(input))

It looks like the gzip module assumes that either the input or the output will be a gzip'd file-on-disk… So I assume that the zlib module is what I want.

However, it doesn't natively offer a simple way to create a stream file-like… And the stream-compression it does support comes by way of manually adding data to a compression buffer, then flushing that buffer.

Of course, I could write a wrapper around zlib.Compress.compress and zlib.Compress.flush (Compress is returned by zlib.compressobj()), but I'd be worried about getting buffer sizes wrong, or something similar.

So, what's the simplest way to create a streaming, gzip-compressing file-like with Python?

Edit: To clarify, the input stream and the compressed output stream are both too large to fit in memory, so something like output_function(StringIO(zlib.compress(input.read()))) doesn't really solve the problem.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

断爱 2024-08-27 03:01:08

它相当混乱(自引用等;只需花几分钟编写它,没有什么真正优雅的),但如果您仍然对使用 gzip 而不是 zlib 感兴趣,它可以满足您的需求直接。

基本上,GzipWrap 是一个(非常有限的)类文件对象,它从给定的可迭代对象中生成一个 gzip 压缩文件(例如,类文件对象、字符串列表、任何生成器......)

当然,它会生成二进制文件,因此实现“readline”是没有意义的。

您应该能够扩展它以涵盖其他情况或用作可迭代对象本身。

from gzip import GzipFile

class GzipWrap(object):
    # input is a filelike object that feeds the input
    def __init__(self, input, filename = None):
        self.input = input
        self.buffer = ''
        self.zipper = GzipFile(filename, mode = 'wb', fileobj = self)

    def read(self, size=-1):
        if (size < 0) or len(self.buffer) < size:
            for s in self.input:
                self.zipper.write(s)
                if size > 0 and len(self.buffer) >= size:
                    self.zipper.flush()
                    break
            else:
                self.zipper.close()
            if size < 0:
                ret = self.buffer
                self.buffer = ''
        else:
            ret, self.buffer = self.buffer[:size], self.buffer[size:]
        return ret

    def flush(self):
        pass

    def write(self, data):
        self.buffer += data

    def close(self):
        self.input.close()

It's quite kludgy (self referencing, etc; just put a few minutes writing it, nothing really elegant), but it does what you want if you're still interested in using gzip instead of zlib directly.

Basically, GzipWrap is a (very limited) file-like object that produces a gzipped file out of a given iterable (e.g., a file-like object, a list of strings, any generator...)

Of course, it produces binary so there was no sense in implementing "readline".

You should be able to expand it to cover other cases or to be used as an iterable object itself.

from gzip import GzipFile

class GzipWrap(object):
    # input is a filelike object that feeds the input
    def __init__(self, input, filename = None):
        self.input = input
        self.buffer = ''
        self.zipper = GzipFile(filename, mode = 'wb', fileobj = self)

    def read(self, size=-1):
        if (size < 0) or len(self.buffer) < size:
            for s in self.input:
                self.zipper.write(s)
                if size > 0 and len(self.buffer) >= size:
                    self.zipper.flush()
                    break
            else:
                self.zipper.close()
            if size < 0:
                ret = self.buffer
                self.buffer = ''
        else:
            ret, self.buffer = self.buffer[:size], self.buffer[size:]
        return ret

    def flush(self):
        pass

    def write(self, data):
        self.buffer += data

    def close(self):
        self.input.close()
荒芜了季节 2024-08-27 03:01:08

这是一个基于 Ricardo Cárdenes 非常有用的答案的更清晰的非自引用版本。

from gzip import GzipFile
from collections import deque


CHUNK = 16 * 1024


class Buffer (object):
    def __init__ (self):
        self.__buf = deque()
        self.__size = 0
    def __len__ (self):
        return self.__size
    def write (self, data):
        self.__buf.append(data)
        self.__size += len(data)
    def read (self, size=-1):
        if size < 0: size = self.__size
        ret_list = []
        while size > 0 and len(self.__buf):
            s = self.__buf.popleft()
            size -= len(s)
            ret_list.append(s)
        if size < 0:
            ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
            self.__buf.appendleft(remainder)
        ret = ''.join(ret_list)
        self.__size -= len(ret)
        return ret
    def flush (self):
        pass
    def close (self):
        pass


class GzipCompressReadStream (object):
    def __init__ (self, fileobj):
        self.__input = fileobj
        self.__buf = Buffer()
        self.__gzip = GzipFile(None, mode='wb', fileobj=self.__buf)
    def read (self, size=-1):
        while size < 0 or len(self.__buf) < size:
            s = self.__input.read(CHUNK)
            if not s:
                self.__gzip.close()
                break
            self.__gzip.write(s)
        return self.__buf.read(size)

优点:

  • 避免重复的字符串串联,导致整个字符串被重复复制。
  • 从输入流中读取固定的 CHUNK 大小,而不是一次读取整行(可以是任意长)。
  • 避免循环引用。
  • 避免误导性的 GzipCompressStream() 公共“write”方法,该方法实际上仅在内部使用。
  • 利用内部成员变量的名称修饰。

Here is a cleaner, non-self-referencing version based on Ricardo Cárdenes' very helpful answer.

from gzip import GzipFile
from collections import deque


CHUNK = 16 * 1024


class Buffer (object):
    def __init__ (self):
        self.__buf = deque()
        self.__size = 0
    def __len__ (self):
        return self.__size
    def write (self, data):
        self.__buf.append(data)
        self.__size += len(data)
    def read (self, size=-1):
        if size < 0: size = self.__size
        ret_list = []
        while size > 0 and len(self.__buf):
            s = self.__buf.popleft()
            size -= len(s)
            ret_list.append(s)
        if size < 0:
            ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
            self.__buf.appendleft(remainder)
        ret = ''.join(ret_list)
        self.__size -= len(ret)
        return ret
    def flush (self):
        pass
    def close (self):
        pass


class GzipCompressReadStream (object):
    def __init__ (self, fileobj):
        self.__input = fileobj
        self.__buf = Buffer()
        self.__gzip = GzipFile(None, mode='wb', fileobj=self.__buf)
    def read (self, size=-1):
        while size < 0 or len(self.__buf) < size:
            s = self.__input.read(CHUNK)
            if not s:
                self.__gzip.close()
                break
            self.__gzip.write(s)
        return self.__buf.read(size)

Advantages:

  • Avoids repeated string concatenation, which would cause the entire string to be copied repeatedly.
  • Reads a fixed CHUNK size from the input stream, instead of reading whole lines at a time (which can be arbitrarily long).
  • Avoids circular references.
  • Avoids misleading public "write" method of GzipCompressStream(), which is really only used internally.
  • Takes advantage of name mangling for internal member variables.
北城孤痞 2024-08-27 03:01:08

gzip 模块支持压缩为类似文件的对象,将 fileobj 参数传递给 GzipFile,以及文件名。您传入的文件名不需要存在,但 gzip 标头有一个需要填写的文件名字段。

更新

这个答案不起作用。示例:

# tmp/try-gzip.py 
import sys
import gzip

fd=gzip.GzipFile(fileobj=sys.stdin)
sys.stdout.write(fd.read())

输出:

===> cat .bash_history  | python tmp/try-gzip.py  > tmp/history.gzip
Traceback (most recent call last):
  File "tmp/try-gzip.py", line 7, in <module>
    sys.stdout.write(fd.read())
  File "/usr/lib/python2.7/gzip.py", line 254, in read
    self._read(readsize)
  File "/usr/lib/python2.7/gzip.py", line 288, in _read
    pos = self.fileobj.tell()   # Save current position
IOError: [Errno 29] Illegal seek

The gzip module supports compressing to a file-like object, pass a fileobj parameter to GzipFile, as well as a filename. The filename you pass in doesn't need to exist, but the gzip header has a filename field which needs to be filled out.

Update

This answer does not work. Example:

# tmp/try-gzip.py 
import sys
import gzip

fd=gzip.GzipFile(fileobj=sys.stdin)
sys.stdout.write(fd.read())

output:

===> cat .bash_history  | python tmp/try-gzip.py  > tmp/history.gzip
Traceback (most recent call last):
  File "tmp/try-gzip.py", line 7, in <module>
    sys.stdout.write(fd.read())
  File "/usr/lib/python2.7/gzip.py", line 254, in read
    self._read(readsize)
  File "/usr/lib/python2.7/gzip.py", line 288, in _read
    pos = self.fileobj.tell()   # Save current position
IOError: [Errno 29] Illegal seek
贱贱哒 2024-08-27 03:01:08

将 cStringIO(或 StringIO)模块与 zlib 结合使用:

>>> import zlib
>>> from cStringIO import StringIO
>>> s.write(zlib.compress("I'm a lumberjack"))
>>> s.seek(0)
>>> zlib.decompress(s.read())
"I'm a lumberjack"

Use the cStringIO (or StringIO) module in conjunction with zlib:

>>> import zlib
>>> from cStringIO import StringIO
>>> s.write(zlib.compress("I'm a lumberjack"))
>>> s.seek(0)
>>> zlib.decompress(s.read())
"I'm a lumberjack"
时光暖心i 2024-08-27 03:01:08

这是可行的(至少在 python 3 中):

with s3.open(path, 'wb') as f:
    gz = gzip.GzipFile(filename, 'wb', 9, f)
    gz.write(b'hello')
    gz.flush()
    gz.close()

这里它写入 s3fs 的文件对象,并对其进行 gzip 压缩。
神奇之处在于 f 参数,它是 GzipFile 的 fileobj。您必须提供 gzip 标头的文件名。

This works (at least in python 3):

with s3.open(path, 'wb') as f:
    gz = gzip.GzipFile(filename, 'wb', 9, f)
    gz.write(b'hello')
    gz.flush()
    gz.close()

Here it writes to s3fs's file object with a gzip compression on it.
The magic is the f parameter, which is GzipFile's fileobj. You have to provide a file name for gzip's header.

棒棒糖 2024-08-27 03:01:08

更干净&由可重用组件组成的更通用的版本:

gzipped_iter = igizip(io_iter(input_file_obj))
gzipped_file_obj = iter_io(prefetch(gzipped_iter))

上面的函数来自我的要点

  • iter_ioio_iter 提供与 Iterable[AnyStr] 之间的透明转换 <-> SupportsRead[AnyStr]
  • igzip 进行流式 gzip 压缩
  • (可选) prefetch 同时通过线程从底层可迭代中拉取,正常情况下向消费者屈服,用于并发读/写
def as_bytes(s: str | bytes):
    if type(s) not in [str, bytes]:
        raise TypeError
    return s.encode() if isinstance(s, str) else s


def iter_io(iterable: Iterable[AnyStr], buffer_size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns a buffered file obj that reads bytes from an iterable of str/bytes.

    Example:

    iter_io(['abc', 'def', 'g']).read() == b'abcdefg'
    iter_io([b'abcd', b'efg']).read(5) == b'abcde'
    """
    class IterIO(io.RawIOBase):
        def __init__(self, iterable: Iterable[AnyStr]):
            self._leftover = b''
            self._iterable = (as_bytes(s) for s in iterable if s)

        def readable(self):
            return True

        def readinto(self, buf):
            try:
                chunk = self._leftover or next(self._iterable)
            except StopIteration:
                return 0    # indicate EOF

            output, self._leftover = chunk[:len(buf)], chunk[len(buf):]
            buf[:len(output)] = output
            return len(output)

    return io.BufferedReader(IterIO(iterable), buffer_size=buffer_size)


def io_iter(fo: SupportsRead[AnyStr], size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns an iterator that reads from a file obj in sized chunks.

    Example:

    list(io_iter(io.StringIO('abcdefg'), 3)) == ['abc', 'def', 'g']
    list(io_iter(io.BytesIO(b'abcdefg'), 4)) == [b'abcd', b'efg']

    Usage notes/TODO:
     * file obj isn't closed, fix /w keep_open=False and an internal contextmanager
    """
    return iter(lambda: fo.read(size), fo.read(0))


def igzip(chunks: Iterable[AnyStr], level=zlib.Z_DEFAULT_COMPRESSION):
    """
    Streaming gzip: lazily compresses an iterable of bytes or str (utf8)

    Example:

    gzipped_bytes_iter = igzip(['hello ', 'world!'])
    gzip.decompress(b''.join(gzipped_bytes_iter)).encode() == 'hello world!'
    """
    def gen():
        gzip_format = 0b10000
        c = zlib.compressobj(level=level, wbits=zlib.MAX_WBITS + gzip_format)

        yield from (c.compress(as_bytes(chunk)) for chunk in chunks)
        yield c.flush()

    return filter(None, gen())


def prefetch(iterable: Iterable[Any], n: int = 1) -> Iterator[Any]:
    """
    Prefetch an iterable via thread, yielding original contents as normal.

    Example:

    def slow_produce(*args):
        for x in args:
            time.sleep(1)
            yield x

    def slow_consume(iterable):
        for _ in iterable:
            time.sleep(1)

    slow_consume(prefetch(slow_produce('a', 'b')))  # takes 3 sec, not 4

    # Prefetch
    # produce: | 'a' | 'b' |
    # consume:       | 'a' | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3

    # No prefetch
    # produce: | 'a' |     | 'b' |
    # consume:       | 'a' |     | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3 --- 4

    Usage notes/TODO:
     * mem leak: Thread is GC'd only after iterable is fully consumed, fix /w __del__
    """
    queue = Queue(n)
    finished = object()

    def produce():
        for x in iterable:
            queue.put(x)
        queue.put(finished)

    t = Thread(target=produce, daemon=True)
    t.start()

    while True:
        item = queue.get()
        if item is finished:
            break
        else:
            yield item

An even cleaner & more generalized version made of reusable components:

gzipped_iter = igizip(io_iter(input_file_obj))
gzipped_file_obj = iter_io(prefetch(gzipped_iter))

The functions above are from my gist:

  • iter_io and io_iter provide transparent conversion to/from Iterable[AnyStr] <-> SupportsRead[AnyStr]
  • igzip does streaming gzip compression
  • (optional) prefetch concurrently pulls from an underlying iterable via a thread, yielding to consumer as normal, for concurrent read/write
def as_bytes(s: str | bytes):
    if type(s) not in [str, bytes]:
        raise TypeError
    return s.encode() if isinstance(s, str) else s


def iter_io(iterable: Iterable[AnyStr], buffer_size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns a buffered file obj that reads bytes from an iterable of str/bytes.

    Example:

    iter_io(['abc', 'def', 'g']).read() == b'abcdefg'
    iter_io([b'abcd', b'efg']).read(5) == b'abcde'
    """
    class IterIO(io.RawIOBase):
        def __init__(self, iterable: Iterable[AnyStr]):
            self._leftover = b''
            self._iterable = (as_bytes(s) for s in iterable if s)

        def readable(self):
            return True

        def readinto(self, buf):
            try:
                chunk = self._leftover or next(self._iterable)
            except StopIteration:
                return 0    # indicate EOF

            output, self._leftover = chunk[:len(buf)], chunk[len(buf):]
            buf[:len(output)] = output
            return len(output)

    return io.BufferedReader(IterIO(iterable), buffer_size=buffer_size)


def io_iter(fo: SupportsRead[AnyStr], size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns an iterator that reads from a file obj in sized chunks.

    Example:

    list(io_iter(io.StringIO('abcdefg'), 3)) == ['abc', 'def', 'g']
    list(io_iter(io.BytesIO(b'abcdefg'), 4)) == [b'abcd', b'efg']

    Usage notes/TODO:
     * file obj isn't closed, fix /w keep_open=False and an internal contextmanager
    """
    return iter(lambda: fo.read(size), fo.read(0))


def igzip(chunks: Iterable[AnyStr], level=zlib.Z_DEFAULT_COMPRESSION):
    """
    Streaming gzip: lazily compresses an iterable of bytes or str (utf8)

    Example:

    gzipped_bytes_iter = igzip(['hello ', 'world!'])
    gzip.decompress(b''.join(gzipped_bytes_iter)).encode() == 'hello world!'
    """
    def gen():
        gzip_format = 0b10000
        c = zlib.compressobj(level=level, wbits=zlib.MAX_WBITS + gzip_format)

        yield from (c.compress(as_bytes(chunk)) for chunk in chunks)
        yield c.flush()

    return filter(None, gen())


def prefetch(iterable: Iterable[Any], n: int = 1) -> Iterator[Any]:
    """
    Prefetch an iterable via thread, yielding original contents as normal.

    Example:

    def slow_produce(*args):
        for x in args:
            time.sleep(1)
            yield x

    def slow_consume(iterable):
        for _ in iterable:
            time.sleep(1)

    slow_consume(prefetch(slow_produce('a', 'b')))  # takes 3 sec, not 4

    # Prefetch
    # produce: | 'a' | 'b' |
    # consume:       | 'a' | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3

    # No prefetch
    # produce: | 'a' |     | 'b' |
    # consume:       | 'a' |     | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3 --- 4

    Usage notes/TODO:
     * mem leak: Thread is GC'd only after iterable is fully consumed, fix /w __del__
    """
    queue = Queue(n)
    finished = object()

    def produce():
        for x in iterable:
            queue.put(x)
        queue.put(finished)

    t = Thread(target=produce, daemon=True)
    t.start()

    while True:
        item = queue.get()
        if item is finished:
            break
        else:
            yield item
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文