Kafka Connect。如何处理自定义对象列表,指定模式和构建sourcerecord值
我有DTO Cryptonews。其中包含
List<Currencies> currencies
我想将“货币” 字段保存到 sourcerecord 时。 无法弄清楚如何:
- 在模式中声明它。
- 建筑价值时将其传递给结构对象。
我的尝试在这个例外结束了: 架构类型结构的无效Java对象:com.dto.currencies类
kafka connect不提供明确的示例,当列表中的对象需要其架构时,如何处理案例。 我还尝试采用与Kafka测试用例相似的方法,但行之有效。 https://github.com/apache/kafka/kafka/blob/trunk/connect/api/src/src/test/java/java/java/org/opache/kafkace/kafka/connect/data/structta/sstructtest.java-uncstrate.java-ul95-l95-l95-l98 < /a>
如何做?
kafka-connect-api version: 0.10.2.0-cp1
value and key converter: org.apache.kafka.connect.json.JsonConverter
no avro used
CryptoNews implements Serializable {
// omitted fields
private List<Currencies> currencies;
}
class Currencies {
private String code;
private String title;
private String slug;
private String url;
}
shemaconfiguration
public static final Integer FIRST_VERSION = 1;
public static final String CURRENCIES_SCHEMA_NAME = "currencies";
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct().name("News")
.version(FIRST_VERSION)
.field(CURRENCIES_SCHEMA_NAME, CURRENCIES_SCHEMA)
// simple fields ommited for brevity.
.build();
public static final Schema CURRENCIES_SCHEMA = SchemaBuilder.array(
SchemaBuilder.struct()
.field(CODE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build()
)
.optional()
.name(CURRENCIES_SCHEMA_NAME)
.version(FIRST_VERSION)
.build();
sourcetask
return new SourceRecord(
sourcePartition(),
sourceOffset(cryptoNews),
config.getString(TOPIC_CONFIG),
null,
CryptoNewsSchema.NEWS_KEY_SCHEMA,
buildRecordKey(cryptoNews),
CryptoNewsSchema.NEWS_SCHEMA,
buildRecordValue(cryptoNews),
Instant.now().toEpochMilli()
);
public Struct buildRecordValue(CryptoNews cryptoNews){
Struct valueStruct = new Struct(CryptoNewsSchema.NEWS_SCHEMA);
// Produces Invalid Java object for schema type STRUCT: class com.dto.Currencies
List<Currencies> currencies = cryptoNews.getCurrencies();
if (currencies != null) {
valueStruct.put(CurrenciesSchema.CURRENCIES_SCHEMA_NAME, currencies);
}
return valueStruct;
}
更新:
worker.properties
bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true
rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000
I have dto CryptoNews. Which contains
List<Currencies> currencies
I would like to save "currencies" field to SourceRecord when constructing it.
Can't figure out how to:
- Declare it in schema.
- Pass it to Struct object when building value.
My attempts end in this exception:
Invalid Java object for schema type STRUCT: class com.dto.Currencies
Kafka Connect doesn't provide explicit example how to do handle case, when object in List requires it's own Schema.
I also tried to apply similar approach as in Kafka test cases, but it doesn't work. https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98
How to do this?
kafka-connect-api version: 0.10.2.0-cp1
value and key converter: org.apache.kafka.connect.json.JsonConverter
no avro used
CryptoNews implements Serializable {
// omitted fields
private List<Currencies> currencies;
}
class Currencies {
private String code;
private String title;
private String slug;
private String url;
}
SchemaConfiguration
public static final Integer FIRST_VERSION = 1;
public static final String CURRENCIES_SCHEMA_NAME = "currencies";
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct().name("News")
.version(FIRST_VERSION)
.field(CURRENCIES_SCHEMA_NAME, CURRENCIES_SCHEMA)
// simple fields ommited for brevity.
.build();
public static final Schema CURRENCIES_SCHEMA = SchemaBuilder.array(
SchemaBuilder.struct()
.field(CODE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build()
)
.optional()
.name(CURRENCIES_SCHEMA_NAME)
.version(FIRST_VERSION)
.build();
SourceTask
return new SourceRecord(
sourcePartition(),
sourceOffset(cryptoNews),
config.getString(TOPIC_CONFIG),
null,
CryptoNewsSchema.NEWS_KEY_SCHEMA,
buildRecordKey(cryptoNews),
CryptoNewsSchema.NEWS_SCHEMA,
buildRecordValue(cryptoNews),
Instant.now().toEpochMilli()
);
public Struct buildRecordValue(CryptoNews cryptoNews){
Struct valueStruct = new Struct(CryptoNewsSchema.NEWS_SCHEMA);
// Produces Invalid Java object for schema type STRUCT: class com.dto.Currencies
List<Currencies> currencies = cryptoNews.getCurrencies();
if (currencies != null) {
valueStruct.put(CurrenciesSchema.CURRENCIES_SCHEMA_NAME, currencies);
}
return valueStruct;
}
UPDATE:
worker.properties
bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true
rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您需要提供
list&lt; struct&gt;
是一个完整的单元测试示例
首先
,一个将有助于替代方法的接口就是仅使用字符串架构,并使用Jackson
Objectmmapper
要获取JSON字符串,然后让JSONCONVERTER
处理其余部分。You need to provide a
List<Struct>
Here's a full unit test example
First, an interface that will help
The alternative approach is to just use a String schema, and use Jackson
ObjectMapper
to get a JSON string, then letJSONConverter
handle the rest.