由于Apache Flink版本1.15,您可以使用压实功能将几个文件合并到一个文件中。
Since version 1.15 of Apache Flink you can use the compaction feature to merge several files into one.
How can we use compaction with bulk Parquet format?
The existing implementations for the RecordWiseFileCompactor.Reader (DecoderBasedReader and ImputFormatBasedReader) do not seem suitable for Parquet.
Furthermore we can not find any example for compacting Parquet or other bulk formats.
来自 java api> 。工厂。
Interface recordwiseFileCompactor.Reader.Factory,decoderbaseadreader.factory和InputformatBaseadeReader.factory.factory.factory.tactory.tactor.有两个实现。
InputFormatBaseadre.Factory创建一个InputFormatBaseedReader,它使用 fileInputformat 我们传递给inputformatbaseadreader.factory constructor。
The writer receives all the records and
,因此有一个来自包装的avroparquetreader从包装中 读取了包装。 org.apache.parquet.avro我们可以使用。它已经实现了打开/读/关闭。因此,我们可以将读取器包裹在FileInputFormat中,并使用AvroparquetReader来完成所有肮脏的作品。
There are two types of file compactor mentioned in flink's document.
If I remember correctly, Parquet saves meta information at end of files. So obviously we need to use RecordWiseFileCompactor. Because we need to read the whole Parquet file so we can get the meta information at the end of the file. Then we can use the meta information (number of row groups, schema) to parse the file.
From the java api, to construct a RecordWiseFileCompactor, we need a instance of RecordWiseFileCompactor.Reader.Factory.
There are two implementations of interface RecordWiseFileCompactor.Reader.Factory, DecoderBasedReader.Factory and InputFormatBasedReader.Factory respectively.
DecoderBasedReader.Factory creates a DecoderBasedReader instance, which reads whole file content from InputStream. We can load the bytes into a buffer and parse the file from the byte buffer, which is obviously painful. So we don't use this implementation.
InputFormatBasedReader.Factory creates a InputFormatBasedReader, which reads whole file content using the FileInputFormat supplier we passed to InputFormatBasedReader.Factory constructor.
The InputFormatBasedReader instance uses the FileInputFormat to read record by record, and pass records to the writer which we passed to forBulkFormat call, till the end of the file.
The writer receives all the records and compact the records into one file.
So the question becomes what is FileInputFormat and how to implement it.
Though there are many methods and fields of class FileInputFormat, we know only four methods are called from InputFormatBasedReader from InputFormatBasedReader source code mentioned above.
Luckily, there's a AvroParquetReader from package org.apache.parquet.avro we can utilize. It has already implemented open/read/close. So we can wrap the reader inside a FileInputFormat and use the AvroParquetReader to do all the dirty works.
Here's a example code snippet
Then you can use the ExampleFileInputFormat like below
I have successfully deployed this to a flink on k8s and compacted files on gcs. There're some notes for deploying.
After all these steps, you can start your job and good to go.