使用压实式块状格式

发布于 2025-02-09 14:07:00 字数 409 浏览 2 评论 0 原文

由于Apache Flink版本1.15,您可以使用压实功能将几个文件合并到一个文件中。

我们如何使用散装parquet格式的压实? RecordWiseFileCompactor.Reader(DecoderBaseedReader和ImpriputformatBaseedReader)的现有实现似乎不适合Parquet。

此外,我们找不到任何用于压实镶木木材或其他散装格式的示例。

Since version 1.15 of Apache Flink you can use the compaction feature to merge several files into one.
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction

How can we use compaction with bulk Parquet format?
The existing implementations for the RecordWiseFileCompactor.Reader (DecoderBasedReader and ImputFormatBasedReader) do not seem suitable for Parquet.

Furthermore we can not find any example for compacting Parquet or other bulk formats.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

感性 2025-02-16 14:07:00

Flink文档中提到了两种类型的文件压实器。

outputStreamBased FileCompactor:用户可以将压实结果写入输出流。当用户不想或无法从输入文件读取记录时,这很有用。

recordwiseFileCompactor:压实器可以从输入文件中逐一读取记录,并将其写入与文件作者类似的结果文件中。

如果我没记错的话,Parquet将在文件末尾保存元信息。因此,显然我们需要使用RecordWiseFileCompactor。因为我们需要读取整个Parquet文件,以便我们可以在文件末尾获取元信息。然后,我们可以使用元信息(行数,模式的数量)来解析文件。

来自 java api> 。工厂。

Interface recordwiseFileCompactor.Reader.Factory,decoderbaseadreader.factory和InputformatBaseadeReader.factory.factory.factory.tactory.tactor.有两个实现。

解码器READER.FACTORY创建一个解码器的Reader实例,该实例从InputStream读取整个文件内容。我们可以将字节加载到缓冲区中,并从字节缓冲区中解析文件,这显然很痛苦。因此,我们不使用此实现。

InputFormatBaseadre.Factory创建一个InputFormatBaseedReader,它使用 fileInputformat 我们传递给inputformatbaseadreader.factory constructor。

InputFormatBasedReader实例使用fileInputFormat进行按记录读取记录,然后将记录传递给我们传递给forbulkformat调用的作者,直到文件结束。

The writer receives all the records and

因此,问题变成了什么是FileInputFormat以及如何实现它。

尽管类FileInputFormat的方法和字段很多,但我们知道从InputFormatBaseedReader从InputFormatBaseedReader源代码中提到的InputFormatBaseedReader调用了四个方法。

  • 打开(FileInputsplit filesplit),它打开“到达”文件
  • (),该文件检查我们是否达到了文件nextrecord()的末端
  • ,该文件从打开的文件
  • close close close close close()读取下一个记录,该记录很幸运地清除了该网站

,因此有一个来自包装的avroparquetreader从包装中 读取了包装。 org.apache.parquet.avro我们可以使用。它已经实现了打开/读/关闭。因此,我们可以将读取器包裹在FileInputFormat中,并使用AvroparquetReader来完成所有肮脏的作品。

这是一个示例代码段

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;

import java.io.IOException;

public class ExampleFileInputFormat extends FileInputFormat<GenericRecord> {

    private ParquetReader<GenericRecord> parquetReader;
    private GenericRecord readRecord;


    @Override
    public void open(FileInputSplit split) throws IOException {
        Configuration config = new Configuration();
        // set hadoop config here
        // for example, if you are using gcs, set fs.gs.impl here
        // i haven't tried to use core-site.xml but i believe this is feasible
        InputFile inputFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), config);
        parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).build();
        readRecord = parquetReader.read();
    }

    @Override
    public void close() throws IOException {
        parquetReader.close();
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return readRecord == null;
    }

    @Override
    public GenericRecord nextRecord(GenericRecord genericRecord) throws IOException {
        GenericRecord r = readRecord;
        readRecord = parquetReader.read();
        return r;
    }
}

,然后您可以使用示例fileInputformat,如下所示,

FileSink<GenericRecord> sink = FileSink.forBulkFormat(
                new Path(path),
                AvroParquetWriters.forGenericRecord(schema))
        .withRollingPolicy(OnCheckpointRollingPolicy.build())
        .enableCompact(
                FileCompactStrategy.Builder.newBuilder()
                        .enableCompactionOnCheckpoint(10)
                        .build(),
                new RecordWiseFileCompactor<>(
                        new InputFormatBasedReader.Factory<>(new SerializableSupplierWithException<FileInputFormat<GenericRecord>, IOException>() {
                            @Override
                            public FileInputFormat<GenericRecord> get() throws IOException {
                                FileInputFormat<GenericRecord> format = new ExampleFileInputFormat();
                                return format;
                            }
                        })
                ))
        .build();

我已经成功地将其部署到了K8S上的Flink和GCS上的紧凑文件。有一些用于部署的注释。

  • 您需要从(在网页中搜索预捆绑的hadoop),将jar插入$ flink_home/lib/
  • 如果将文件写入某些对象存储(例如GC),则需要按照插件指令。请记住将插件罐放入插件文件夹中,而不是放置Lib Foler。
  • 如果您将文件写入某些对象存储,则需要从云服务供应商下载连接器jar。例如,我正在使用GCS并下载gcs-connector jar plost gcp gcp指示。将罐子放入$ FLINK_HOME/LIB或$ FLINK_HOME/插件以外的一些foler中。我将连接器罐放入新制作的文件夹中$ flink_home/hadoop-lib
  • set环境hadoop_classpath = $ flink_home/lib/lib/your_shaded_hadoop_jar:$ flink_home/hadoop_home/hadoop lib/your_connector_jar,

在所有这些步骤之后,您都可以开始工作,并且可以开始工作。

There are two types of file compactor mentioned in flink's document.

OutputStreamBasedFileCompactor : The users can write the compacted results into an output stream. This is useful when the users don’t want to or can’t read records from the input files.

RecordWiseFileCompactor : The compactor can read records one-by-one from the input files and write into the result file similar to the FileWriter.

If I remember correctly, Parquet saves meta information at end of files. So obviously we need to use RecordWiseFileCompactor. Because we need to read the whole Parquet file so we can get the meta information at the end of the file. Then we can use the meta information (number of row groups, schema) to parse the file.

From the java api, to construct a RecordWiseFileCompactor, we need a instance of RecordWiseFileCompactor.Reader.Factory.

There are two implementations of interface RecordWiseFileCompactor.Reader.Factory, DecoderBasedReader.Factory and InputFormatBasedReader.Factory respectively.

DecoderBasedReader.Factory creates a DecoderBasedReader instance, which reads whole file content from InputStream. We can load the bytes into a buffer and parse the file from the byte buffer, which is obviously painful. So we don't use this implementation.

InputFormatBasedReader.Factory creates a InputFormatBasedReader, which reads whole file content using the FileInputFormat supplier we passed to InputFormatBasedReader.Factory constructor.

The InputFormatBasedReader instance uses the FileInputFormat to read record by record, and pass records to the writer which we passed to forBulkFormat call, till the end of the file.

The writer receives all the records and compact the records into one file.

So the question becomes what is FileInputFormat and how to implement it.

Though there are many methods and fields of class FileInputFormat, we know only four methods are called from InputFormatBasedReader from InputFormatBasedReader source code mentioned above.

  • open(FileInputSplit fileSplit), which opens the file
  • reachedEnd(), which checks if we hit end of file
  • nextRecord(), which reads next record from the opened file
  • close(), which cleans up the site

Luckily, there's a AvroParquetReader from package org.apache.parquet.avro we can utilize. It has already implemented open/read/close. So we can wrap the reader inside a FileInputFormat and use the AvroParquetReader to do all the dirty works.

Here's a example code snippet

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;

import java.io.IOException;

public class ExampleFileInputFormat extends FileInputFormat<GenericRecord> {

    private ParquetReader<GenericRecord> parquetReader;
    private GenericRecord readRecord;


    @Override
    public void open(FileInputSplit split) throws IOException {
        Configuration config = new Configuration();
        // set hadoop config here
        // for example, if you are using gcs, set fs.gs.impl here
        // i haven't tried to use core-site.xml but i believe this is feasible
        InputFile inputFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), config);
        parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).build();
        readRecord = parquetReader.read();
    }

    @Override
    public void close() throws IOException {
        parquetReader.close();
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return readRecord == null;
    }

    @Override
    public GenericRecord nextRecord(GenericRecord genericRecord) throws IOException {
        GenericRecord r = readRecord;
        readRecord = parquetReader.read();
        return r;
    }
}

Then you can use the ExampleFileInputFormat like below

FileSink<GenericRecord> sink = FileSink.forBulkFormat(
                new Path(path),
                AvroParquetWriters.forGenericRecord(schema))
        .withRollingPolicy(OnCheckpointRollingPolicy.build())
        .enableCompact(
                FileCompactStrategy.Builder.newBuilder()
                        .enableCompactionOnCheckpoint(10)
                        .build(),
                new RecordWiseFileCompactor<>(
                        new InputFormatBasedReader.Factory<>(new SerializableSupplierWithException<FileInputFormat<GenericRecord>, IOException>() {
                            @Override
                            public FileInputFormat<GenericRecord> get() throws IOException {
                                FileInputFormat<GenericRecord> format = new ExampleFileInputFormat();
                                return format;
                            }
                        })
                ))
        .build();

I have successfully deployed this to a flink on k8s and compacted files on gcs. There're some notes for deploying.

  • You need to download flink shaded hadoop jar from https://flink.apache.org/downloads.html (search Pre-bundled Hadoop in webpage) and the jar into $FLINK_HOME/lib/
  • If you are writing files to some object storage, for example gcs, you need to follow the plugin instruction. Remember to put the plugin jar into the plugin folder but not the lib foler.
  • If you are writing files to some object storage, you need to download the connector jar from cloud service supplier. For example, I'm using gcs and download gcs-connector jar following GCP instruction. Put the jar into some foler other than $FLINK_HOME/lib or $FLINK_HOME/plugins. I put the connector jar into a newly made folder $FLINK_HOME/hadoop-lib
  • Set environment HADOOP_CLASSPATH=$FLINK_HOME/lib/YOUR_SHADED_HADOOP_JAR:$FLINK_HOME/hadoop-lib/YOUR_CONNECTOR_JAR

After all these steps, you can start your job and good to go.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文