ParquetPrototowriter创建一个不可读的镶木木材文件
我的.proto文件包含一个地图类型字段。
Message Foo {
...
...
map<string, uint32> fooMap = 19;
}
我正在消费来自Kafka源的消息,并试图将消息写入S3存储桶中的镶木quet文件。 代码的相关部分看起来像这样:
val basePath = "s3a:// ..."
env
.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
.map(x => toJavaProto(x))
.sinkTo(
FileSink
.forBulkFormat(basePath, ParquetProtoWriters.forType(classOf(Foo)))
.withOutputFileConfig(
OutputFileConfig
.builder()
.withPartPrefix("foo")
.withPartSuffix(".parquet")
.build()
)
.build()
)
.setParallelism(1)
env.execute()
结果是镶木quet文件实际上是为S3编写的,但该文件似乎已损坏。当我尝试使用Avro / Parquet查看器插件读取文件时,我可以看到此错误:
无法处理文件 .../downloads/foo-9366c15f-270e-4939-AD88-B77EE27DDC2F-0.Parquet java.lang.unsupportedoperationException:重复不支持 外部列表或地图。类型:重复组foomap = 19 {可选 二进制键(字符串)= 1;可选的INT32值= 2; } 在 org.apache.parquet.avro.avroschemaconverter.convertfields(avroschemaconverter.java:277) 在 org.apache.parquet.avro.avroschemaconverter.convert(avroschemaconverter.java:264) 在 org.apache.parquet.avro.avroreadsupport.prepareforread(avroreadsupport.java:134) 在 org.apache.parquet.hadoop.internalparquetrecordreader.initialize(InternalParquetreCordReader.java:185) 在 org.apache.parquet.hadoop.parquetreader.initreader(parquetreader.java:156) 在 org.apache.parquet.hadoop.parquetreader.read(parquetreader.java:135) 在 uk.co.hadoopathome.intellij.viewer.fileformat.parquetfilereader.getRecords(parquetfilereader.java:99) 在 uk.co.hadoopathome.intellij.viewer.fileviewertoolwindow $ 2.Doinbackground(fileviewertoolwindow.java:193) 在 uk.co.hadoopathome.intellij.viewer.fileviewertoolwindow $ 2.doinbackground(fileviewertoolwindow.java:184) 在java.desktop/javax.swing.swingworker $ 1.CALL(swingworker.java:304) 在java.base/java.util.concurrent.futuretask.run(futuretask.java:264) 在java.desktop/javax.swing.swingworker.run(swingworker.java:343)at java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1128) 在 java.base/java.util.concurrent.threadpoolexecutor $ worker.run(threadpoolexecutor.java:628) 在java.base/java.lang.thread.run(thread.java:829)
Flink版本1.15 原始2
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
parquet-format
和parquet-mr
中有一些破裂的变化。我不熟悉Flink,但我想您必须配置org.apache.flink.formats.parquet.protobuf.parquetprotowriters
以生成正确的格式。我直接使用了
Parquet-Mr
,并遇到了同一问题。 Avro Parquet读取器无法读取以下代码生成的镶木quet文件:如果将配置值更改为
true
,它将成功:通过查看其源代码,我们可以知道此函数适用于在配置中设置
parquet.proto.writespecscompliant
在
parquetprotowriters.fortype
's 源代码,它使用构建器类parquetprotototototototOtoWriterBuiliterBuilderBuilder>,使用
> org.apache.parquet.proto.protowritesupport
内部。我想您可以以某种方式将其分配给它的正确配置ProtoWritesUpport
。我还安装了此Python工具: https://pypi.org/project/project/parquet/parquet-tools/ 检查镶木quet文件。以较旧格式生成的列表字段将就像:
以较新的格式一样:
希望本文可以为您提供一些指导。
参考
There are some breaking changes in
parquet-format
andparquet-mr
. I'm not familiar with Flink, but I guess you have to configureorg.apache.flink.formats.parquet.protobuf.ParquetProtoWriters
to generate a correct format.I used
parquet-mr
directly and encountered the same issue. An avro parquet reader is unable to read the parquet file generated by the following code:If the config value is changed to
true
, it will succeed:By looking its source code, we can know this function is for setting the boolean value of
parquet.proto.writeSpecsCompliant
in the config.In
ParquetProtoWriters.forType
's source code, it create a factory with builder classParquetProtoWriterBuilder
, which usesorg.apache.parquet.proto.ProtoWriteSupport
internally. I guess you can somehow assign it with a correctly configuredProtoWriteSupport
to it.I also install this Python tool: https://pypi.org/project/parquet-tools/ to inspect parquet files. List fields generated in older format will be like:
and in newer format will be like:
Hope this article could give you some direction.
References