变换流
我有一个通用流,具有使用AVRO的值进行值的通用流,架构具有名称和年龄。
KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers("localhost:9092")
.setTopics("sucheth1")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://localhost:8081"))
.build();
DataStream<GenericRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
我正在尝试将每个记录映射并在流中添加一个新的字段“位置”,并且我会遇到错误,说
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: location
at org.apache.avro.generic.GenericData$Record.put(GenericData.java:242)
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: location
我可以通过添加新字段来转换流?
I have a GenericRecord stream with value deserialised using Avro, schema has name and age.
KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers("localhost:9092")
.setTopics("sucheth1")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://localhost:8081"))
.build();
DataStream<GenericRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
I'm trying to map each record and add a new field 'location' to the stream and I get error saying
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: location
at org.apache.avro.generic.GenericData$Record.put(GenericData.java:242)
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: location
Is there a way I can transform the stream by adding new fields?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这里的问题是Avro,而不是Flink。您的转换(地图功能)将需要发射使用不同架构的记录(其中包括新字段)。
通过添加一个字段可能会提供一些有用的见解。
The issue here is Avro, not Flink. Your transformation (the map function) will need to emit records that use a different schema (one that includes the new field).
Extend Avro schema via Java API by adding one field may provide some helpful insight.