从 Python 中的生成器创建 zip 文件?

发布于 2024-07-09 05:09:17 字数 237 浏览 5 评论 0原文

我有大量数据(几兆)需要用 Python 写入 zip 文件。 我无法将其全部加载到内存中以传递给 ZipFile 的 .writestr 方法,而且我真的不想使用临时文件将其全部传输到磁盘,然后将其读回。

有没有办法将生成器或类似文件的对象提供给 ZipFile 库? 或者是否有某种原因似乎不支持此功能?

我所说的 zip 文件是指 zip 文件。 正如 Python zipfile 包中所支持的那样。

I've got a large amount of data (a couple gigs) I need to write to a zip file in Python. I can't load it all into memory at once to pass to the .writestr method of ZipFile, and I really don't want to feed it all out to disk using temporary files and then read it back.

Is there a way to feed a generator or a file-like object to the ZipFile library? Or is there some reason this capability doesn't seem to be supported?

By zip file, I mean zip file. As supported in the Python zipfile package.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(13

你如我软肋 2024-07-16 05:09:17

唯一的解决方案是重写用于压缩文件以从缓冲区读取的方法。 将其添加到标准库中是很简单的; 我有点惊讶它还没有完成。 我收集到,有很多人一致认为整个界面需要彻底修改,但这似乎阻碍了任何渐进式改进。

import zipfile, zlib, binascii, struct
class BufferedZipFile(zipfile.ZipFile):
    def writebuffered(self, zipinfo, buffer):
        zinfo = zipinfo

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(1024 * 8)
            if not buf:
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            self.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = self.fp.tell()
        self.fp.seek(zinfo.header_offset + 14, 0)
        self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        self.fp.seek(position, 0)
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo

The only solution is to rewrite the method it uses for zipping files to read from a buffer. It would be trivial to add this to the standard libraries; I'm kind of amazed it hasn't been done yet. I gather there's a lot of agreement the entire interface needs to be overhauled, and that seems to be blocking any incremental improvements.

import zipfile, zlib, binascii, struct
class BufferedZipFile(zipfile.ZipFile):
    def writebuffered(self, zipinfo, buffer):
        zinfo = zipinfo

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(1024 * 8)
            if not buf:
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            self.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = self.fp.tell()
        self.fp.seek(zinfo.header_offset + 14, 0)
        self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        self.fp.seek(position, 0)
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
岁月无声 2024-07-16 05:09:17

Python 3.5中进行了更改(来自官方文档):添加了支持写入不可查找流。

这意味着现在对于 zipfile.ZipFile 我们可以使用不将整个文件存储在内存中的流。 此类流不支持移动整个数据量。

这是一个简单的生成器:

from zipfile import ZipFile, ZipInfo

def zipfile_generator(path, stream):
    with ZipFile(stream, mode='w') as zf:
        z_info = ZipInfo.from_file(path)
        with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
            for chunk in iter(lambda: entry.read(16384), b''):
                dest.write(chunk)
                # Yield chunk of the zip file stream in bytes.
                yield stream.get()
    # ZipFile was closed.
    yield stream.get()

path 是大文件或目录或pathlike 对象的字符串路径。

stream 是此类的不可查找流实例(根据官方文档):

from io import RawIOBase

class UnseekableStream(RawIOBase):
    def __init__(self):
        self._buffer = b''

    def writable(self):
        return True

    def write(self, b):
        if self.closed:
            raise ValueError('Stream was closed!')
        self._buffer += b
        return len(b)

    def get(self):
        chunk = self._buffer
        self._buffer = b''
        return chunk

您可以在线尝试此代码:https://repl.it/@IvanErgunov/zipfilegenerator


还有另一种方法可以创建一个没有 ZipInfo 的生成器并手动读取和分割大文件。 您可以将 queue.Queue() 对象传递给 UnseekableStream() 对象,并在另一个线程中写入此队列。 然后在当前线程中,您可以简单地以可迭代的方式从该队列中读取块。 请参阅文档

PS
allanlei 的 Python Zipstream 是过时且不可靠的方式。 这是在正式完成之前尝试添加对不可搜索流的支持。

Changed in Python 3.5 (from official docs): Added support for writing to unseekable streams.

This means that now for zipfile.ZipFile we can use streams which do not store the entire file in memory. Such streams do not support movement over the entire data volume.

So this is simple generator:

from zipfile import ZipFile, ZipInfo

def zipfile_generator(path, stream):
    with ZipFile(stream, mode='w') as zf:
        z_info = ZipInfo.from_file(path)
        with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
            for chunk in iter(lambda: entry.read(16384), b''):
                dest.write(chunk)
                # Yield chunk of the zip file stream in bytes.
                yield stream.get()
    # ZipFile was closed.
    yield stream.get()

path is a string path of the large file or directory or pathlike object.

stream is the unseekable stream instance of the class like this (designed according to official docs):

from io import RawIOBase

class UnseekableStream(RawIOBase):
    def __init__(self):
        self._buffer = b''

    def writable(self):
        return True

    def write(self, b):
        if self.closed:
            raise ValueError('Stream was closed!')
        self._buffer += b
        return len(b)

    def get(self):
        chunk = self._buffer
        self._buffer = b''
        return chunk

You can try this code online: https://repl.it/@IvanErgunov/zipfilegenerator


There is also another way to create a generator without ZipInfo and manually reading and dividing your large file. You can pass the queue.Queue() object to your UnseekableStream() object and write to this queue in another thread. Then in current thread you can simply read chunks from this queue in iterable way. See docs

P.S.
Python Zipstream by allanlei is outdated and unreliable way. It was an attempt to add support for unseekable streams before it was done officially.

丢了幸福的猪 2024-07-16 05:09:17

我采用了Chris B.的答案 并创建了一个完整的解决方案。 如果其他人感兴趣的话,这里是:

import os
import threading
from zipfile import *
import zlib, binascii, struct

class ZipEntryWriter(threading.Thread):
    def __init__(self, zf, zinfo, fileobj):
        self.zf = zf
        self.zinfo = zinfo
        self.fileobj = fileobj

        zinfo.file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = zf.fp.tell()

        zf._writecheck(zinfo)
        zf._didModify = True

        zinfo.CRC = 0
        zinfo.compress_size = compress_size = 0
        zf.fp.write(zinfo.FileHeader())

        super(ZipEntryWriter, self).__init__()

    def run(self):
        zinfo = self.zinfo
        zf = self.zf
        file_size = 0
        CRC = 0

        if zinfo.compress_type == ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None
        while True:
            buf = self.fileobj.read(1024 * 8)
            if not buf:
                self.fileobj.close()
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC)
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            zf.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            zf.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = zf.fp.tell()
        zf.fp.seek(zinfo.header_offset + 14, 0)
        zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        zf.fp.seek(position, 0)
        zf.filelist.append(zinfo)
        zf.NameToInfo[zinfo.filename] = zinfo

class EnhZipFile(ZipFile, object):

    def _current_writer(self):
        return hasattr(self, 'cur_writer') and self.cur_writer or None

    def assert_no_current_writer(self):
        cur_writer = self._current_writer()
        if cur_writer and cur_writer.isAlive():
            raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)

    def write(self, filename, arcname=None, compress_type=None):
        self.assert_no_current_writer()
        super(EnhZipFile, self).write(filename, arcname, compress_type)

    def writestr(self, zinfo_or_arcname, bytes):
        self.assert_no_current_writer()
        super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)

    def close(self):
        self.finish_entry()
        super(EnhZipFile, self).close()

    def start_entry(self, zipinfo):
        """
        Start writing a new entry with the specified ZipInfo and return a
        file like object. Any data written to the file like object is
        read by a background thread and written directly to the zip file.
        Make sure to close the returned file object, before closing the
        zipfile, or the close() would end up hanging indefinitely.

        Only one entry can be open at any time. If multiple entries need to
        be written, make sure to call finish_entry() before calling any of
        these methods:
        - start_entry
        - write
        - writestr
        It is not necessary to explicitly call finish_entry() before closing
        zipfile.

        Example:
            zf = EnhZipFile('tmp.zip', 'w')
            w = zf.start_entry(ZipInfo('t.txt'))
            w.write("some text")
            w.close()
            zf.close()
        """
        self.assert_no_current_writer()
        r, w = os.pipe()
        self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
        self.cur_writer.start()
        return os.fdopen(w, 'w')

    def finish_entry(self, timeout=None):
        """
        Ensure that the ZipEntry that is currently being written is finished.
        Joins on any background thread to exit. It is safe to call this method
        multiple times.
        """
        cur_writer = self._current_writer()
        if not cur_writer or not cur_writer.isAlive():
            return
        cur_writer.join(timeout)

if __name__ == "__main__":
    zf = EnhZipFile('c:/tmp/t.zip', 'w')
    import time
    w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
    w.write("Line1\n")
    w.write("Line2\n")
    w.close()
    zf.finish_entry()
    w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
    w.write("Some text\n")
    w.close()
    zf.close()

I took Chris B.'s answer and created a complete solution. Here it is in case anyone else is interested:

import os
import threading
from zipfile import *
import zlib, binascii, struct

class ZipEntryWriter(threading.Thread):
    def __init__(self, zf, zinfo, fileobj):
        self.zf = zf
        self.zinfo = zinfo
        self.fileobj = fileobj

        zinfo.file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = zf.fp.tell()

        zf._writecheck(zinfo)
        zf._didModify = True

        zinfo.CRC = 0
        zinfo.compress_size = compress_size = 0
        zf.fp.write(zinfo.FileHeader())

        super(ZipEntryWriter, self).__init__()

    def run(self):
        zinfo = self.zinfo
        zf = self.zf
        file_size = 0
        CRC = 0

        if zinfo.compress_type == ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None
        while True:
            buf = self.fileobj.read(1024 * 8)
            if not buf:
                self.fileobj.close()
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC)
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            zf.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            zf.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = zf.fp.tell()
        zf.fp.seek(zinfo.header_offset + 14, 0)
        zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        zf.fp.seek(position, 0)
        zf.filelist.append(zinfo)
        zf.NameToInfo[zinfo.filename] = zinfo

class EnhZipFile(ZipFile, object):

    def _current_writer(self):
        return hasattr(self, 'cur_writer') and self.cur_writer or None

    def assert_no_current_writer(self):
        cur_writer = self._current_writer()
        if cur_writer and cur_writer.isAlive():
            raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)

    def write(self, filename, arcname=None, compress_type=None):
        self.assert_no_current_writer()
        super(EnhZipFile, self).write(filename, arcname, compress_type)

    def writestr(self, zinfo_or_arcname, bytes):
        self.assert_no_current_writer()
        super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)

    def close(self):
        self.finish_entry()
        super(EnhZipFile, self).close()

    def start_entry(self, zipinfo):
        """
        Start writing a new entry with the specified ZipInfo and return a
        file like object. Any data written to the file like object is
        read by a background thread and written directly to the zip file.
        Make sure to close the returned file object, before closing the
        zipfile, or the close() would end up hanging indefinitely.

        Only one entry can be open at any time. If multiple entries need to
        be written, make sure to call finish_entry() before calling any of
        these methods:
        - start_entry
        - write
        - writestr
        It is not necessary to explicitly call finish_entry() before closing
        zipfile.

        Example:
            zf = EnhZipFile('tmp.zip', 'w')
            w = zf.start_entry(ZipInfo('t.txt'))
            w.write("some text")
            w.close()
            zf.close()
        """
        self.assert_no_current_writer()
        r, w = os.pipe()
        self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
        self.cur_writer.start()
        return os.fdopen(w, 'w')

    def finish_entry(self, timeout=None):
        """
        Ensure that the ZipEntry that is currently being written is finished.
        Joins on any background thread to exit. It is safe to call this method
        multiple times.
        """
        cur_writer = self._current_writer()
        if not cur_writer or not cur_writer.isAlive():
            return
        cur_writer.join(timeout)

if __name__ == "__main__":
    zf = EnhZipFile('c:/tmp/t.zip', 'w')
    import time
    w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
    w.write("Line1\n")
    w.write("Line2\n")
    w.close()
    zf.finish_entry()
    w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
    w.write("Some text\n")
    w.close()
    zf.close()
最单纯的乌龟 2024-07-16 05:09:17

gzip.GzipFile 将数据写入 gzipped chunks 中,您可以根据从文件读取的行数设置块的大小。

一个例子:

file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
  chunks.append(line)
  if len(chunks) >= X: 
      file.write("".join(chunks))
      file.flush()
      chunks = []

gzip.GzipFile writes the data in gzipped chunks , which you can set the size of your chunks according to the numbers of lines read from the files.

an example:

file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
  chunks.append(line)
  if len(chunks) >= X: 
      file.write("".join(chunks))
      file.flush()
      chunks = []
策马西风 2024-07-16 05:09:17

基本的压缩是由 zlib.compressobj 完成的。 ZipFile(在 MacOSX 上的 Python 2.5 下似乎是编译的)。 Python 2.3版本如下。

您可以看到它以 8k 块的形式构建压缩文件。 提取源文件信息比较复杂,因为zip文件头中记录了很多源文件属性(例如未压缩的大小)。

def write(self, filename, arcname=None, compress_type=None):
    """Put the bytes from filename into the archive under the name
    arcname."""

    st = os.stat(filename)
    mtime = time.localtime(st.st_mtime)
    date_time = mtime[0:6]
    # Create ZipInfo instance to store file information
    if arcname is None:
        zinfo = ZipInfo(filename, date_time)
    else:
        zinfo = ZipInfo(arcname, date_time)
    zinfo.external_attr = st[0] << 16L      # Unix attributes
    if compress_type is None:
        zinfo.compress_type = self.compression
    else:
        zinfo.compress_type = compress_type
    self._writecheck(zinfo)
    fp = open(filename, "rb")

    zinfo.flag_bits = 0x00
    zinfo.header_offset = self.fp.tell()    # Start of header bytes
    # Must overwrite CRC and sizes with correct data later
    zinfo.CRC = CRC = 0
    zinfo.compress_size = compress_size = 0
    zinfo.file_size = file_size = 0
    self.fp.write(zinfo.FileHeader())
    zinfo.file_offset = self.fp.tell()      # Start of file bytes
    if zinfo.compress_type == ZIP_DEFLATED:
        cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
             zlib.DEFLATED, -15)
    else:
        cmpr = None
    while 1:
        buf = fp.read(1024 * 8)
        if not buf:
            break
        file_size = file_size + len(buf)
        CRC = binascii.crc32(buf, CRC)
        if cmpr:
            buf = cmpr.compress(buf)
            compress_size = compress_size + len(buf)
        self.fp.write(buf)
    fp.close()
    if cmpr:
        buf = cmpr.flush()
        compress_size = compress_size + len(buf)
        self.fp.write(buf)
        zinfo.compress_size = compress_size
    else:
        zinfo.compress_size = file_size
    zinfo.CRC = CRC
    zinfo.file_size = file_size
    # Seek backwards and write CRC and file sizes
    position = self.fp.tell()       # Preserve current position in file
    self.fp.seek(zinfo.header_offset + 14, 0)
    self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
          zinfo.file_size))
    self.fp.seek(position, 0)
    self.filelist.append(zinfo)
    self.NameToInfo[zinfo.filename] = zinfo

The essential compression is done by zlib.compressobj. ZipFile (under Python 2.5 on MacOSX appears to be compiled). The Python 2.3 version is as follows.

You can see that it builds the compressed file in 8k chunks. Taking out the source file information is complex because a lot of source file attributes (like uncompressed size) is recorded in the zip file header.

def write(self, filename, arcname=None, compress_type=None):
    """Put the bytes from filename into the archive under the name
    arcname."""

    st = os.stat(filename)
    mtime = time.localtime(st.st_mtime)
    date_time = mtime[0:6]
    # Create ZipInfo instance to store file information
    if arcname is None:
        zinfo = ZipInfo(filename, date_time)
    else:
        zinfo = ZipInfo(arcname, date_time)
    zinfo.external_attr = st[0] << 16L      # Unix attributes
    if compress_type is None:
        zinfo.compress_type = self.compression
    else:
        zinfo.compress_type = compress_type
    self._writecheck(zinfo)
    fp = open(filename, "rb")

    zinfo.flag_bits = 0x00
    zinfo.header_offset = self.fp.tell()    # Start of header bytes
    # Must overwrite CRC and sizes with correct data later
    zinfo.CRC = CRC = 0
    zinfo.compress_size = compress_size = 0
    zinfo.file_size = file_size = 0
    self.fp.write(zinfo.FileHeader())
    zinfo.file_offset = self.fp.tell()      # Start of file bytes
    if zinfo.compress_type == ZIP_DEFLATED:
        cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
             zlib.DEFLATED, -15)
    else:
        cmpr = None
    while 1:
        buf = fp.read(1024 * 8)
        if not buf:
            break
        file_size = file_size + len(buf)
        CRC = binascii.crc32(buf, CRC)
        if cmpr:
            buf = cmpr.compress(buf)
            compress_size = compress_size + len(buf)
        self.fp.write(buf)
    fp.close()
    if cmpr:
        buf = cmpr.flush()
        compress_size = compress_size + len(buf)
        self.fp.write(buf)
        zinfo.compress_size = compress_size
    else:
        zinfo.compress_size = file_size
    zinfo.CRC = CRC
    zinfo.file_size = file_size
    # Seek backwards and write CRC and file sizes
    position = self.fp.tell()       # Preserve current position in file
    self.fp.seek(zinfo.header_offset + 14, 0)
    self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
          zinfo.file_size))
    self.fp.seek(position, 0)
    self.filelist.append(zinfo)
    self.NameToInfo[zinfo.filename] = zinfo
Spring初心 2024-07-16 05:09:17

一些(许多?大多数?)压缩算法基于查看整个文件的冗余。

一些压缩库会根据最适合文件的方式在多种压缩算法之间进行选择。

我相信 ZipFile 模块会执行此操作,因此它希望查看整个文件,而不仅仅是一次查看各个文件。

因此,它无法与无法加载到内存中的生成器或文件一起使用。 这可以解释 Zipfile 库的局限性。

Some (many? most?) compression algorithms are based on looking at redundancies across the entire file.

Some compression libraries will choose between several compression algorithms based on which works best on the file.

I believe the ZipFile module does this, so it wants to see the entire file, not just pieces at a time.

Hence, it won't work with generators or files to big to load in memory. That would explain the limitation of the Zipfile library.

笑脸一如从前 2024-07-16 05:09:17

如果有人偶然发现这个问题,这个问题在 2017 年仍然与 Python 2.7 相关,这里有一个真正的流式 zip 文件的工作解决方案,不需要像其他情况那样输出可查找。 秘诀是设置通用位标志的第 3 位(请参阅 https://pkware .cachefly.net/webdocs/casestudies/APPNOTE.TXT 第 4.3.9.1 节)。

请注意,此实现将始终创建 ZIP64 样式的文件,允许流处理任意大的文件。 它包含一个丑陋的 hack 来强制中央目录记录的 zip64 结尾,因此请注意,它会导致您的进程编写的所有 zip 文件变成 ZIP64 样式。

import io
import zipfile
import zlib
import binascii
import struct

class ByteStreamer(io.BytesIO):
    '''
    Variant on BytesIO which lets you write and consume data while
    keeping track of the total filesize written. When data is consumed
    it is removed from memory, keeping the memory requirements low.
    '''
    def __init__(self):
        super(ByteStreamer, self).__init__()
        self._tellall = 0

    def tell(self):
        return self._tellall

    def write(self, b):
        orig_size = super(ByteStreamer, self).tell()
        super(ByteStreamer, self).write(b)
        new_size = super(ByteStreamer, self).tell()
        self._tellall += (new_size - orig_size)

    def consume(self):
        bytes = self.getvalue()
        self.seek(0)
        self.truncate(0)
        return bytes

class BufferedZipFileWriter(zipfile.ZipFile):
    '''
    ZipFile writer with true streaming (input and output).
    Created zip files are always ZIP64-style because it is the only safe way to stream
    potentially large zip files without knowing the full size ahead of time.

    Example usage:
    >>> def stream():
    >>>     bzfw = BufferedZip64FileWriter()
    >>>     for arc_path, buffer in inputs:  # buffer is a file-like object which supports read(size)
    >>>         for chunk in bzfw.streambuffer(arc_path, buffer):
    >>>             yield chunk
    >>>     yield bzfw.close()
    '''
    def __init__(self, compression=zipfile.ZIP_DEFLATED):
        self._buffer = ByteStreamer()
        super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)

    def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
        if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
            zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                                    date_time=time.localtime(time.time())[:6])
            zinfo.compress_type = self.compression
            zinfo.external_attr = 0o600 << 16     # ?rw-------
        else:
            zinfo = zinfo_or_arcname

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x08  # Streaming mode: crc and size come after the data
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(chunksize)
            if not buf:
                break

            file_size += len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size += len(buf)

            self.fp.write(buf)
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes

        if cmpr:
            buf = cmpr.flush()
            compress_size += len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        # Write CRC and file sizes after the file data
        # Always write as zip64 -- only safe way to stream what might become a large zipfile
        fmt = '<LQQ'
        self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))

        self.fp.flush()
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
        yield self._buffer.consume()

    # The close method needs to be patched to force writing a ZIP64 file
    # We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
    def close(self):
        tmp = zipfile.ZIP_FILECOUNT_LIMIT
        zipfile.ZIP_FILECOUNT_LIMIT = 0
        super(BufferedZipFileWriter, self).close()
        zipfile.ZIP_FILECOUNT_LIMIT = tmp
        return self._buffer.consume()

In case anyone stumbles upon this question, which is still relevant in 2017 for Python 2.7, here's a working solution for a true streaming zip file, with no requirement for the output to be seekable as in the other cases. The secret is to set bit 3 of the general purpose bit flag (see https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT section 4.3.9.1).

Note that this implementation will always create a ZIP64-style file, allowing the streaming to work for arbitrarily large files. It includes an ugly hack to force the zip64 end of central directory record, so be aware it will cause all zipfiles written by your process to become ZIP64-style.

import io
import zipfile
import zlib
import binascii
import struct

class ByteStreamer(io.BytesIO):
    '''
    Variant on BytesIO which lets you write and consume data while
    keeping track of the total filesize written. When data is consumed
    it is removed from memory, keeping the memory requirements low.
    '''
    def __init__(self):
        super(ByteStreamer, self).__init__()
        self._tellall = 0

    def tell(self):
        return self._tellall

    def write(self, b):
        orig_size = super(ByteStreamer, self).tell()
        super(ByteStreamer, self).write(b)
        new_size = super(ByteStreamer, self).tell()
        self._tellall += (new_size - orig_size)

    def consume(self):
        bytes = self.getvalue()
        self.seek(0)
        self.truncate(0)
        return bytes

class BufferedZipFileWriter(zipfile.ZipFile):
    '''
    ZipFile writer with true streaming (input and output).
    Created zip files are always ZIP64-style because it is the only safe way to stream
    potentially large zip files without knowing the full size ahead of time.

    Example usage:
    >>> def stream():
    >>>     bzfw = BufferedZip64FileWriter()
    >>>     for arc_path, buffer in inputs:  # buffer is a file-like object which supports read(size)
    >>>         for chunk in bzfw.streambuffer(arc_path, buffer):
    >>>             yield chunk
    >>>     yield bzfw.close()
    '''
    def __init__(self, compression=zipfile.ZIP_DEFLATED):
        self._buffer = ByteStreamer()
        super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)

    def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
        if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
            zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                                    date_time=time.localtime(time.time())[:6])
            zinfo.compress_type = self.compression
            zinfo.external_attr = 0o600 << 16     # ?rw-------
        else:
            zinfo = zinfo_or_arcname

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x08  # Streaming mode: crc and size come after the data
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(chunksize)
            if not buf:
                break

            file_size += len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size += len(buf)

            self.fp.write(buf)
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes

        if cmpr:
            buf = cmpr.flush()
            compress_size += len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        # Write CRC and file sizes after the file data
        # Always write as zip64 -- only safe way to stream what might become a large zipfile
        fmt = '<LQQ'
        self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))

        self.fp.flush()
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
        yield self._buffer.consume()

    # The close method needs to be patched to force writing a ZIP64 file
    # We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
    def close(self):
        tmp = zipfile.ZIP_FILECOUNT_LIMIT
        zipfile.ZIP_FILECOUNT_LIMIT = 0
        super(BufferedZipFileWriter, self).close()
        zipfile.ZIP_FILECOUNT_LIMIT = tmp
        return self._buffer.consume()
智商已欠费 2024-07-16 05:09:17

gzip 库将采用类似文件的对象进行压缩。

class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])

您仍然需要提供一个名义文件名以包含在 zip 文件中,但您可以将数据源传递给 fileobj。

(这个答案与 Damnsweet 的不同,重点应该是增量读取的数据源,而不是增量写入的压缩文件。)

现在我看到了原来的提问者不会接受 Gzip :-(

The gzip library will take a file-like object for compression.

class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])

You still need to provide a nominal filename for inclusion in the zip file, but you can pass your data-source to the fileobj.

(This answer differs from that of Damnsweet, in that the focus should be on the data-source being incrementally read, not the compressed file being incrementally written.)

And I see now the original questioner won't accept Gzip :-(

迷离° 2024-07-16 05:09:17

现在使用 python 2.7,您可以将数据添加到文件的 zipfile 中:

http ://docs.python.org/2/library/zipfile#zipfile.ZipFile.writestr

Now with python 2.7 you can add data to the zipfile insted of the file :

http://docs.python.org/2/library/zipfile#zipfile.ZipFile.writestr

柏拉图鍀咏恒 2024-07-16 05:09:17

现在是 2017 年了。如果您仍然希望优雅地做到这一点,请使用 allanlei 的 Python Zipstream
到目前为止,它可能是唯一能够实现这一目标的编写良好的库。

This is 2017. If you are still looking to do this elegantly, use Python Zipstream by allanlei.
So far, it is probably the only well written library to accomplish that.

标点 2024-07-16 05:09:17

gzip.GzipFile 将数据写入 gzipped chunks 中,您可以根据从文件读取的行数设置块的大小。

一个例子:

file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
  chunks.append(line)
  if len(chunks) >= X: 
      file.write("".join(chunks))
      file.flush()
      chunks = []

gzip.GzipFile writes the data in gzipped chunks , which you can set the size of your chunks according to the numbers of lines read from the files.

an example:

file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
  chunks.append(line)
  if len(chunks) >= X: 
      file.write("".join(chunks))
      file.flush()
      chunks = []
沧笙踏歌 2024-07-16 05:09:17

您可以使用 stream-zip (完整披露:主要由我编写)。

假设您有要压缩的字节生成器:

def file_data_1():
    yield b'Some bytes a'
    yield b'Some bytes b'

def file_data_2():
    yield b'Some bytes c'
    yield b'Some bytes d'

您可以创建这些生成器的压缩字节的单个迭代器:

from datetime import datetime
from stream_zip import ZIP_64, stream_zip

def zip_member_files():
    modified_at = datetime.now()
    perms = 0o600
    yield 'my-file-1.txt', modified_at, perms, ZIP_64, file_data_1()
    yield 'my-file-2.txt', modified_at, perms, ZIP_64, file_data_2()

zipped_chunks = stream_zip(zip_member_files()):

然后,例如,通过以下方式将此迭代器保存到磁盘:

with open('my.zip', 'wb') as f:
    for chunk in zipped_chunks:
        f.write(chunk)

You can use stream-zip for this (full disclosure: written mostly by me).

Say you have generators of bytes you want to zip:

def file_data_1():
    yield b'Some bytes a'
    yield b'Some bytes b'

def file_data_2():
    yield b'Some bytes c'
    yield b'Some bytes d'

You can created a single iterable of the zipped bytes of these generators:

from datetime import datetime
from stream_zip import ZIP_64, stream_zip

def zip_member_files():
    modified_at = datetime.now()
    perms = 0o600
    yield 'my-file-1.txt', modified_at, perms, ZIP_64, file_data_1()
    yield 'my-file-2.txt', modified_at, perms, ZIP_64, file_data_2()

zipped_chunks = stream_zip(zip_member_files()):

And then, for example, save this iterable to disk by:

with open('my.zip', 'wb') as f:
    for chunk in zipped_chunks:
        f.write(chunk)
染火枫林 2024-07-16 05:09:17

zipstream-ng 库可以处理这种情况:

from zipstream import ZipStream

def my_iterator():
    yield b"some bytes"

def my_other_iterator():
    yield b"some bytes"

zs = ZipStream()
zs.add(my_iterator(), "file.ext")
zs.add(my_other_iterator(), "otherfile.ext")

with open("out.zip", "wb") as fp:
    fp.writelines(zs)

The zipstream-ng library handles this exact scenario:

from zipstream import ZipStream

def my_iterator():
    yield b"some bytes"

def my_other_iterator():
    yield b"some bytes"

zs = ZipStream()
zs.add(my_iterator(), "file.ext")
zs.add(my_other_iterator(), "otherfile.ext")

with open("out.zip", "wb") as fp:
    fp.writelines(zs)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文