ParquetPrototowriter创建一个不可读的镶木木材文件

发布于 2025-02-07 19:58:19 字数 2290 浏览 1 评论 0 原文

我的.proto文件包含一个地图类型字段。

Message Foo {
    ...
    ...
    map<string, uint32> fooMap = 19; 
}

我正在消费来自Kafka源的消息,并试图将消息写入S3存储桶中的镶木quet文件。 代码的相关部分看起来像这样:

  val basePath = "s3a:// ..."

  env
    .fromSource(source, WatermarkStrategy.noWatermarks(), "source")
    .map(x => toJavaProto(x))
    .sinkTo(
      FileSink
        .forBulkFormat(basePath, ParquetProtoWriters.forType(classOf(Foo)))
        .withOutputFileConfig(
          OutputFileConfig
            .builder()
            .withPartPrefix("foo")
            .withPartSuffix(".parquet")
            .build()
        )
        .build()
    )
    .setParallelism(1)
  env.execute()

结果是镶木quet文件实际上是为S3编写的,但该文件似乎已损坏。当我尝试使用Avro / Parquet查看器插件读取文件时,我可以看到此错误:

无法处理文件 .../downloads/foo-9366c15f-270e-4939-AD88-B77EE27DDC2F-0.Parquet java.lang.unsupportedoperationException:重复不支持 外部列表或地图。类型:重复组foomap = 19 {可选 二进制键(字符串)= 1;可选的INT32值= 2; } 在 org.apache.parquet.avro.avroschemaconverter.convertfields(avroschemaconverter.java:277) 在 org.apache.parquet.avro.avroschemaconverter.convert(avroschemaconverter.java:264) 在 org.apache.parquet.avro.avroreadsupport.prepareforread(avroreadsupport.java:134) 在 org.apache.parquet.hadoop.internalparquetrecordreader.initialize(InternalParquetreCordReader.java:185) 在 org.apache.parquet.hadoop.parquetreader.initreader(parquetreader.java:156) 在 org.apache.parquet.hadoop.parquetreader.read(parquetreader.java:135) 在 uk.co.hadoopathome.intellij.viewer.fileformat.parquetfilereader.getRecords(parquetfilereader.java:99) 在 uk.co.hadoopathome.intellij.viewer.fileviewertoolwindow $ 2.Doinbackground(fileviewertoolwindow.java:193) 在 uk.co.hadoopathome.intellij.viewer.fileviewertoolwindow $ 2.doinbackground(fileviewertoolwindow.java:184) 在java.desktop/javax.swing.swingworker $ 1.CALL(swingworker.java:304) 在java.base/java.util.concurrent.futuretask.run(futuretask.java:264) 在java.desktop/javax.swing.swingworker.run(swingworker.java:343)at java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1128) 在 java.base/java.util.concurrent.threadpoolexecutor $ worker.run(threadpoolexecutor.java:628) 在java.base/java.lang.thread.run(thread.java:829)

Flink版本1.15 原始2

My .proto file contains one field of map type.

Message Foo {
    ...
    ...
    map<string, uint32> fooMap = 19; 
}

I'm consuming messages from Kafka source and trying to write the messages as a parquet file to S3 bucket.
The relevant part of the code looks like this:

  val basePath = "s3a:// ..."

  env
    .fromSource(source, WatermarkStrategy.noWatermarks(), "source")
    .map(x => toJavaProto(x))
    .sinkTo(
      FileSink
        .forBulkFormat(basePath, ParquetProtoWriters.forType(classOf(Foo)))
        .withOutputFileConfig(
          OutputFileConfig
            .builder()
            .withPartPrefix("foo")
            .withPartSuffix(".parquet")
            .build()
        )
        .build()
    )
    .setParallelism(1)
  env.execute()

The result is that a parquet file was actually written for S3, but the file appears to be corrupted. When I try to read the file using the Avro / Parquet Viewer plugin I can see this error:

Unable to process file
.../Downloads/foo-9366c15f-270e-4939-ad88-b77ee27ddc2f-0.parquet
java.lang.UnsupportedOperationException: REPEATED not supported
outside LIST or MAP. Type: repeated group fooMap = 19 { optional
binary key (STRING) = 1; optional int32 value = 2; } at
org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:277)
at
org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:264)
at
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:134)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
at
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
at
org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at
uk.co.hadoopathome.intellij.viewer.fileformat.ParquetFileReader.getRecords(ParquetFileReader.java:99)
at
uk.co.hadoopathome.intellij.viewer.FileViewerToolWindow$2.doInBackground(FileViewerToolWindow.java:193)
at
uk.co.hadoopathome.intellij.viewer.FileViewerToolWindow$2.doInBackground(FileViewerToolWindow.java:184)
at java.desktop/javax.swing.SwingWorker$1.call(SwingWorker.java:304)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.desktop/javax.swing.SwingWorker.run(SwingWorker.java:343) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

Flink version 1.15
proto 2

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

油饼 2025-02-14 19:58:19

parquet-format parquet-mr 中有一些破裂的变化。我不熟悉Flink,但我想您必须配置 org.apache.flink.formats.parquet.protobuf.parquetprotowriters 以生成正确的格式。


我直接使用了 Parquet-Mr ,并遇到了同一问题。 Avro Parquet读取器无法读取以下代码生成的镶木quet文件:

import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.proto.ProtoWriteSupport;

...

var conf = new Configuration();
ProtoWriteSupport.setWriteSpecsCompliant(conf, false);

var builder = ProtoParquetWriter.builder(file)
        .withMessage(Xxx.class)
        .withCompressionCodec(CompressionCodecName.GZIP)
        .withWriteMode(Mode.OVERWRITE)
        .withConf(conf);

try (var writer = builder.build()) {
    writer.write(pb.toBuilder());
}

如果将配置值更改为 true ,它将成功:

ProtoWriteSupport.setWriteSpecsCompliant(conf, true);

通过查看其源代码,我们可以知道此函数适用于在配置中设置 parquet.proto.writespecscompliant

parquetprotowriters.fortype 's 源代码,它使用构建器类 parquetprotototototototOtoWriterBuiliterBuilderBuilder>,使用 > org.apache.parquet.proto.protowritesupport 内部。我想您可以以某种方式将其分配给它的正确配置 ProtoWritesUpport


我还安装了此Python工具: https://pypi.org/project/project/parquet/parquet-tools/ 检查镶木quet文件。以较旧格式生成的列表字段将就像:

...
############ Column(f2) ############
name: f2
path: f1.f2
...

以较新的格式一样:

...
############ Column(element) ############
name: element
path: f1.f2.list.element
...

希望本文可以为您提供一些指导。

参考

There are some breaking changes in parquet-format and parquet-mr. I'm not familiar with Flink, but I guess you have to configure org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters to generate a correct format.


I used parquet-mr directly and encountered the same issue. An avro parquet reader is unable to read the parquet file generated by the following code:

import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.proto.ProtoWriteSupport;

...

var conf = new Configuration();
ProtoWriteSupport.setWriteSpecsCompliant(conf, false);

var builder = ProtoParquetWriter.builder(file)
        .withMessage(Xxx.class)
        .withCompressionCodec(CompressionCodecName.GZIP)
        .withWriteMode(Mode.OVERWRITE)
        .withConf(conf);

try (var writer = builder.build()) {
    writer.write(pb.toBuilder());
}

If the config value is changed to true, it will succeed:

ProtoWriteSupport.setWriteSpecsCompliant(conf, true);

By looking its source code, we can know this function is for setting the boolean value of parquet.proto.writeSpecsCompliant in the config.

In ParquetProtoWriters.forType's source code, it create a factory with builder class ParquetProtoWriterBuilder, which uses org.apache.parquet.proto.ProtoWriteSupport internally. I guess you can somehow assign it with a correctly configured ProtoWriteSupport to it.


I also install this Python tool: https://pypi.org/project/parquet-tools/ to inspect parquet files. List fields generated in older format will be like:

...
############ Column(f2) ############
name: f2
path: f1.f2
...

and in newer format will be like:

...
############ Column(element) ############
name: element
path: f1.f2.list.element
...

Hope this article could give you some direction.

References

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文