使用我想要拆分并发送到两个不同主题的 kafkastreams 使用 json
我有一个来自 kafka 主题的大型 json,我正在将其转换为 Java 对象,以便在查找数据库中提取我需要的值。有些记录中会有一系列缺陷,我需要捕获这些缺陷并将其发送到不同的主题,以便它们最终可以出现在数据库中自己的表中。使用接收器连接器将值插入到数据库中,这就是我们使用多个主题的原因。
我发现了分支和拆分,但看起来这更多是为了确定所使用的记录应该转到哪个主题,而不是将记录的不同部分发送到不同的主题。有没有办法做到这一点,或者我是否需要在某个地方改变我的架构。
@Autowired
void buildPipeline(StreamsBuilder builder) throws Exception{
KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
logger.info("started consumer");
System.out.println(messageStream.toString());
KStream<String, String> auditRecords = messageStream
.map((key, value) -> {
try {
return new KeyValue<>("", mapStringToAuditOutput(value));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
auditRecords.to(outputTopic);
}
public String mapStringToAuditOutput(String input) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
AuditOutput auditResults = null;
try {
auditResults= mapper.readValue(input, AuditOutput.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//check for multiple defects
if(auditResults.getCaseObject().getDefects() != null){
//send defects to separate topic
}
String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
System.out.println(auditJson);
return auditJson;
}
I have a large json that is coming in from a kafka topic that I am converting to a Java object to pull out just the values I need in the find DB. Some of the records will have an array of defects in them that I need to capture and send to a different topic so they can end up in their own table in the DB. Values are being inserted into the DB using a sink connector, that is why we are using multiple topics.
I have found branching and split, but it looks like that is more for determining which topic a consumed record should go to, not sending different parts of the record to different topics. Is there a way to do this or do I need to change my architecture somewhere.
@Autowired
void buildPipeline(StreamsBuilder builder) throws Exception{
KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
logger.info("started consumer");
System.out.println(messageStream.toString());
KStream<String, String> auditRecords = messageStream
.map((key, value) -> {
try {
return new KeyValue<>("", mapStringToAuditOutput(value));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
auditRecords.to(outputTopic);
}
public String mapStringToAuditOutput(String input) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
AuditOutput auditResults = null;
try {
auditResults= mapper.readValue(input, AuditOutput.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//check for multiple defects
if(auditResults.getCaseObject().getDefects() != null){
//send defects to separate topic
}
String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
System.out.println(auditJson);
return auditJson;
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
正确。您需要在分支之前过滤+map/mapValues,以将部分/整个事件发送到不同的主题
更具体地说,创建中间KStream实例并多次使用
to()
例如,
Correct. You need to filter + map/mapValues prior to the branching to send parts/whole events to different topics
More specifically, create intermediate KStream instances and use
to()
multiple timesFor example,