使用流[使用 zip4j] 生成 Big Zip 并将其上传到 s3

发布于 2025-01-15 17:54:32 字数 3352 浏览 2 评论 0原文

我正在努力生成一个 zip 文件,该文件必须压缩大约 2000 个文档,总大小约为 1GB,然后将 zip 文件上传到 s3 存储桶中。

我正在使用 net.lingala.zip4j,这是一个非常好的 Java 库,用于处理 Zip 文件。 基于文档:https://github.com/srikanth-lingala/zip4j 我' m 使用它的流处理部分。 该代码看起来与文档中的代码几乎相似:

public ByteArrayOutputStream compress(FileCompressingContext fileCompressingContext) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ZipOutputStream zos = new ZipOutputStream(baos)) {
    if (fileCompressingContext.getFiles() != null) {
        for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
            addFileToZip(zos, file.getContent(), file.getName());
        }
    }

    if (fileCompressingContext.getFolders() != null) {
        for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
            int i = 0;
            for (FileCompressingContext.File file : folder.getFiles()) {
                addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
            }
        }
    }
}

return baos;}

private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
    byte[] buff = new byte[4096];
    int readLen;

    ZipParameters zp = new ZipParameters();
    zp.setFileNameInZip(fileName);
    zos.putNextEntry(zp);
    try (InputStream is = new ByteArrayInputStream(file)) {
        while ((readLen = is.read(buff)) != -1) {
            zos.write(buff, 0, readLen);
        }
    }

    zos.closeEntry();
}

问题是 zos.closeEntry(); 在压缩 1000 个文档后抛出 java.lang.OutOfMemoryError: Java heap space:

java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[na:na]
at net.lingala.zip4j.io.outputstream.CountingOutputStream.write(CountingOutputStream.java:29) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipEntryOutputStream.write(ZipEntryOutputStream.java:33) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CipherOutputStream.write(CipherOutputStream.java:50) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CompressedOutputStream.write(CompressedOutputStream.java:26) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.deflate(DeflaterOutputStream.java:55) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.closeEntry(DeflaterOutputStream.java:63) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipOutputStream.closeEntry(ZipOutputStream.java:108) ~[zip4j-2.9.1.jar:na]

您是否认为有一种解决方案可以在 Zip 存档生成时将其增量流式传输到 S3!? 我的意思是定期解析 ByteArrayOutputStream 并上传到 s3,然后重置 baos..

如果不是,还有什么替代方案?写入磁盘,读取并上传到 s3 ?嗯或者可能是批量压缩?

出于好奇,我尝试批量处理这些文档。类似 100 个文档后,写入 Zip,然后重新进行该过程。这里的问题是,每 100 个文档就会覆盖现有的 Zip。所以这又不起作用了。 我尝试对每 100 个文档调用此方法:

new ZipFile("test.zip").addStream(new ByteArrayInputStream(baos_lisb.toByteArray()), zp);

,但正如我所说,它会覆盖 zip 内容,因此不会附加。

预先感谢

I'm working on generating a zip file that has to compress around 2000 documents that are around 1GB in total and after that to upload the zip file in s3 bucket.

I'm using net.lingala.zip4j which is a really nice Java library for handling Zip files.
Based on the documentation:https://github.com/srikanth-lingala/zip4j I'm using the stream handling part of it.
The code looks almost similar to the one from the documentation:

public ByteArrayOutputStream compress(FileCompressingContext fileCompressingContext) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ZipOutputStream zos = new ZipOutputStream(baos)) {
    if (fileCompressingContext.getFiles() != null) {
        for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
            addFileToZip(zos, file.getContent(), file.getName());
        }
    }

    if (fileCompressingContext.getFolders() != null) {
        for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
            int i = 0;
            for (FileCompressingContext.File file : folder.getFiles()) {
                addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
            }
        }
    }
}

return baos;}

private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
    byte[] buff = new byte[4096];
    int readLen;

    ZipParameters zp = new ZipParameters();
    zp.setFileNameInZip(fileName);
    zos.putNextEntry(zp);
    try (InputStream is = new ByteArrayInputStream(file)) {
        while ((readLen = is.read(buff)) != -1) {
            zos.write(buff, 0, readLen);
        }
    }

    zos.closeEntry();
}

The problem is that the zos.closeEntry(); is throwing after 1000 of documents being compressed, the java.lang.OutOfMemoryError: Java heap space:

java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[na:na]
at net.lingala.zip4j.io.outputstream.CountingOutputStream.write(CountingOutputStream.java:29) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipEntryOutputStream.write(ZipEntryOutputStream.java:33) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CipherOutputStream.write(CipherOutputStream.java:50) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CompressedOutputStream.write(CompressedOutputStream.java:26) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.deflate(DeflaterOutputStream.java:55) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.closeEntry(DeflaterOutputStream.java:63) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipOutputStream.closeEntry(ZipOutputStream.java:108) ~[zip4j-2.9.1.jar:na]

Do you think there is a solution to incrementally stream the Zip archive to S3 as it's being generating !?
I mean something like parsing periodically the ByteArrayOutputStream and upload to s3, and after that reset the baos..

If not what are the alternatives ? writing on the disk, and read it and uploaded to s3 ? hmm or maybe compressed in batches ?

Just for the curiosity, I've tried to processed the documents in batches. Something like after 100 documents, write in the Zip, and retake the process. The issue here was that every 100 documents will overwrite the existing Zip. So this again wasn't working.
I've tried to call this for every 100 documents:

new ZipFile("test.zip").addStream(new ByteArrayInputStream(baos_lisb.toByteArray()), zp);

, but as I said is overwriting the zip content, so it's not appending.

Thanks upfront

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦太阳 2025-01-22 17:54:32

有趣的是,这是在本地计算机上,而我在 zip 生成过程中遇到了 OutOfMemoryError

在测试环境中,我在检索文档时遇到了OutOfMemoryError。所以 Hibernate 也在抱怨。这是先于一代人的一步。可能会发生这种情况,因为本地计算机有 16GB,而测试环境只有 1GB。

因此,解决方案是基于以下步骤构建的:

  1. 使用 Hibernate 批量检索文件,以及(刷新/清理)
    事务性entityManager,以强制Hibernate不将所有文件保留在内存中。批量大小为:50 个文档。
  2. 使用 Aws 分段上传调整 zip4j 压缩的代码,以便仅压缩和上传一批文件,并在之后重置缓冲区,以避免内存不足。

Step2 的设计和改编基于:https://www.bennadel.com/blog/3971-generate-and-incrementally-stream-a-zip-archive-to-amazon-s3-using-multipart-uploads-in-lucee-cfml-5 -3-7-47.htm

所以最初问题的代码如下:

    @Override
public void compressAndPublish(final FileCompressingContext fileCompressingContext) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ZipOutputStream zos = new ZipOutputStream(baos);
    if (fileCompressingContext.getFiles() != null) {
        for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
            addFileToZip(zos, file.getContent(), file.getName());
        }
    }

if (fileCompressingContext.getFolders() != null) {
    // 1. initialize multi part
    String uploadId = fileHandlerService.initialiseMultiPart(FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName("file.zip")
            .build());

    int partNumber = 0;
    int docNr = 0;
    List<CompletedPart> completedParts = new ArrayList<>();

    for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
        while (!folder.getDocPks().isEmpty()) {
            extractDocuments(folder, fileCompressingContext);

            for (FileCompressingContext.File file : folder.getFiles()) {
                if (baos.size() > PART_SIZE) {
                    log.debug("Id:{} - Preparing for update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
                    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                            .id(fileCompressingContext.getTaskId())
                            .root(bucket)
                            .fileName(file.zip)
                            .fileContent(baos.toByteArray())
                            .build();
                    // 2. upload parts of the zip
                    completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));

                    partNumber++;
                    baos.reset();
                }

                addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
                docNr++;
            }

            folder.getFiles().clear();
        }
    }

    finalizeZipContent(zos, baos);

    // 3. checks is there are any data remained under 5Mb
    if (baos.size() != 0) {
        log.debug("Id:{} - Preparing LAST update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);

        FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                .id(fileCompressingContext.getTaskId())
                .root(bucket)
                .fileName(file.zip)
                .fileContent(baos.toByteArray())
                .build();
        completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
    }

    // 4. finish multipart operation
    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName(file.zip)
            .build();
    fileHandlerService.finishMultipartUpload(fileHandlerContext, uploadId, completedParts);

    log.debug("Id:{} - Multipart upload finished with partNr:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
} else {
    finalizeZipContent(zos, baos);

    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName("file.zip")
            .fileContent(baos.toByteArray())
            .fileExtension("application/zip")
            .build();
    fileHandlerService.store(fileHandlerContext);
}

}

所以唯一改变的是与 aws multipart 的集成,它允许在中上传大数据数据块。每次上传后都会重置缓冲区: baos.reset();

另外一个重要的步骤是这个方法:

private void finalizeZipContent(ZipOutputStream zos, ByteArrayOutputStream baos) throws IOException {
zos.flush();
zos.close();
baos.close();
}

,它关闭 ZipOutputStream 和 ByteArrayOutputStream。如果最后没有完成此步骤,该 zip 将看起来像是损坏的。

另外,方法 addFileToZip(...) 可以写得更简单:

private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
ZipParameters zp = new ZipParameters();
zp.setFileNameInZip(fileName);
zos.putNextEntry(zp);
zos.write(file);
zos.closeEntry();
    zos.flush();
}

,不需要定义固定大小的数组字节

真的希望这会对某人有所帮助并节省一些时间。干杯

Funny enough, this was on the local machine while I got OutOfMemoryError during the zip generation.

In testing environment, I got OutOfMemoryError during the retrieval of the documents. So Hibernate was complaining too. This was with a step before the generation. Probably this happened since local machine has 16GB and testing env only 1GB.

So the solution was build based on the following steps:

  1. retrieve the files in batches with Hibernate, and (flush/clean)
    transactional entityManager, in order to force Hibernate to not keep in memory all the files. The Batch size was: 50 documents.
  2. Adapt the code for the zip4j compression with Aws multipart upload, in order to compress and upload only one batch of files, and reset the buffers afterwards, to avoid OutOfMemory.

Step2 was designed and adapted based on: https://www.bennadel.com/blog/3971-generate-and-incrementally-stream-a-zip-archive-to-amazon-s3-using-multipart-uploads-in-lucee-cfml-5-3-7-47.htm

So the code from the initial question became as follows:

    @Override
public void compressAndPublish(final FileCompressingContext fileCompressingContext) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ZipOutputStream zos = new ZipOutputStream(baos);
    if (fileCompressingContext.getFiles() != null) {
        for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
            addFileToZip(zos, file.getContent(), file.getName());
        }
    }

if (fileCompressingContext.getFolders() != null) {
    // 1. initialize multi part
    String uploadId = fileHandlerService.initialiseMultiPart(FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName("file.zip")
            .build());

    int partNumber = 0;
    int docNr = 0;
    List<CompletedPart> completedParts = new ArrayList<>();

    for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
        while (!folder.getDocPks().isEmpty()) {
            extractDocuments(folder, fileCompressingContext);

            for (FileCompressingContext.File file : folder.getFiles()) {
                if (baos.size() > PART_SIZE) {
                    log.debug("Id:{} - Preparing for update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
                    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                            .id(fileCompressingContext.getTaskId())
                            .root(bucket)
                            .fileName(file.zip)
                            .fileContent(baos.toByteArray())
                            .build();
                    // 2. upload parts of the zip
                    completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));

                    partNumber++;
                    baos.reset();
                }

                addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
                docNr++;
            }

            folder.getFiles().clear();
        }
    }

    finalizeZipContent(zos, baos);

    // 3. checks is there are any data remained under 5Mb
    if (baos.size() != 0) {
        log.debug("Id:{} - Preparing LAST update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);

        FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                .id(fileCompressingContext.getTaskId())
                .root(bucket)
                .fileName(file.zip)
                .fileContent(baos.toByteArray())
                .build();
        completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
    }

    // 4. finish multipart operation
    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName(file.zip)
            .build();
    fileHandlerService.finishMultipartUpload(fileHandlerContext, uploadId, completedParts);

    log.debug("Id:{} - Multipart upload finished with partNr:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
} else {
    finalizeZipContent(zos, baos);

    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName("file.zip")
            .fileContent(baos.toByteArray())
            .fileExtension("application/zip")
            .build();
    fileHandlerService.store(fileHandlerContext);
}

}

So the only thing that changed, was the integration with aws multipart which allows to upload big data in chunks of data. And also the resetting of the buffer after every upload: baos.reset();

Also another important step is this method:

private void finalizeZipContent(ZipOutputStream zos, ByteArrayOutputStream baos) throws IOException {
zos.flush();
zos.close();
baos.close();
}

,which close the ZipOutputStream and ByteArrayOutputStream. If this step is not done at the end, the zip will look as a corrupt one.

Also the method addFileToZip(...) can be written much simpler:

private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
ZipParameters zp = new ZipParameters();
zp.setFileNameInZip(fileName);
zos.putNextEntry(zp);
zos.write(file);
zos.closeEntry();
    zos.flush();
}

, it's not required to define that fix size byte of array

Really hope this will help someone and will save some time. Cheers

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文