由于Apache Flink版本1.15,您可以使用压实功能将几个文件合并到一个文件中。
我们如何使用散装parquet格式的压实?
RecordWiseFileCompactor.Reader(DecoderBaseedReader和ImpriputformatBaseedReader)的现有实现似乎不适合Parquet。
此外,我们找不到任何用于压实镶木木材或其他散装格式的示例。
Since version 1.15 of Apache Flink you can use the compaction feature to merge several files into one.
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction
How can we use compaction with bulk Parquet format?
The existing implementations for the RecordWiseFileCompactor.Reader (DecoderBasedReader and ImputFormatBasedReader) do not seem suitable for Parquet.
Furthermore we can not find any example for compacting Parquet or other bulk formats.
发布评论
评论(1)
Flink文档中提到了两种类型的文件压实器。
如果我没记错的话,Parquet将在文件末尾保存元信息。因此,显然我们需要使用RecordWiseFileCompactor。因为我们需要读取整个Parquet文件,以便我们可以在文件末尾获取元信息。然后,我们可以使用元信息(行数,模式的数量)来解析文件。
来自 java api> 。工厂。
Interface recordwiseFileCompactor.Reader.Factory,decoderbaseadreader.factory和InputformatBaseadeReader.factory.factory.factory.tactory.tactor.有两个实现。
解码器READER.FACTORY创建一个解码器的Reader实例,该实例从InputStream读取整个文件内容。我们可以将字节加载到缓冲区中,并从字节缓冲区中解析文件,这显然很痛苦。因此,我们不使用此实现。
InputFormatBaseadre.Factory创建一个InputFormatBaseedReader,它使用 fileInputformat 我们传递给inputformatbaseadreader.factory constructor。
InputFormatBasedReader实例使用fileInputFormat进行按记录读取记录,然后将记录传递给我们传递给forbulkformat调用的作者,直到文件结束。
The writer receives all the records and
因此,问题变成了什么是FileInputFormat以及如何实现它。
尽管类FileInputFormat的方法和字段很多,但我们知道从InputFormatBaseedReader从InputFormatBaseedReader源代码中提到的InputFormatBaseedReader调用了四个方法。
,因此有一个来自包装的avroparquetreader从包装中 读取了包装。 org.apache.parquet.avro我们可以使用。它已经实现了打开/读/关闭。因此,我们可以将读取器包裹在FileInputFormat中,并使用AvroparquetReader来完成所有肮脏的作品。
这是一个示例代码段
,然后您可以使用示例fileInputformat,如下所示,
我已经成功地将其部署到了K8S上的Flink和GCS上的紧凑文件。有一些用于部署的注释。
在所有这些步骤之后,您都可以开始工作,并且可以开始工作。
There are two types of file compactor mentioned in flink's document.
If I remember correctly, Parquet saves meta information at end of files. So obviously we need to use RecordWiseFileCompactor. Because we need to read the whole Parquet file so we can get the meta information at the end of the file. Then we can use the meta information (number of row groups, schema) to parse the file.
From the java api, to construct a RecordWiseFileCompactor, we need a instance of RecordWiseFileCompactor.Reader.Factory.
There are two implementations of interface RecordWiseFileCompactor.Reader.Factory, DecoderBasedReader.Factory and InputFormatBasedReader.Factory respectively.
DecoderBasedReader.Factory creates a DecoderBasedReader instance, which reads whole file content from InputStream. We can load the bytes into a buffer and parse the file from the byte buffer, which is obviously painful. So we don't use this implementation.
InputFormatBasedReader.Factory creates a InputFormatBasedReader, which reads whole file content using the FileInputFormat supplier we passed to InputFormatBasedReader.Factory constructor.
The InputFormatBasedReader instance uses the FileInputFormat to read record by record, and pass records to the writer which we passed to forBulkFormat call, till the end of the file.
The writer receives all the records and compact the records into one file.
So the question becomes what is FileInputFormat and how to implement it.
Though there are many methods and fields of class FileInputFormat, we know only four methods are called from InputFormatBasedReader from InputFormatBasedReader source code mentioned above.
Luckily, there's a AvroParquetReader from package org.apache.parquet.avro we can utilize. It has already implemented open/read/close. So we can wrap the reader inside a FileInputFormat and use the AvroParquetReader to do all the dirty works.
Here's a example code snippet
Then you can use the ExampleFileInputFormat like below
I have successfully deployed this to a flink on k8s and compacted files on gcs. There're some notes for deploying.
After all these steps, you can start your job and good to go.