使用我想要拆分并发送到两个不同主题的 kafkastreams 使用 json

发布于 2025-01-11 13:49:39 字数 1712 浏览 0 评论 0原文

我有一个来自 kafka 主题的大型 json,我正在将其转换为 Java 对象,以便在查找数据库中提取我需要的值。有些记录中会有一系列缺陷,我需要捕获这些缺陷并将其发送到不同的主题,以便它们最终可以出现在数据库中自己的表中。使用接收器连接器将值插入到数据库中,这就是我们使用多个主题的原因。

我发现了分支和拆分,但看起来这更多是为了确定所使用的记录应该转到哪个主题,而不是将记录的不同部分发送到不同的主题。有没有办法做到这一点,或者我是否需要在某个地方改变我的架构。

    @Autowired
    void buildPipeline(StreamsBuilder builder) throws Exception{
        KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
                logger.info("started consumer");
        System.out.println(messageStream.toString());

        KStream<String, String> auditRecords = messageStream
                  .map((key, value) -> {
                      try {
                          return new KeyValue<>("", mapStringToAuditOutput(value));
                      } catch (JsonProcessingException e) {
                          e.printStackTrace();
                      }
                      return null;
                  });
     auditRecords.to(outputTopic);

    }
    public String mapStringToAuditOutput(String input) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        AuditOutput auditResults = null;
        try {
            auditResults=  mapper.readValue(input, AuditOutput.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        //check for multiple defects
        if(auditResults.getCaseObject().getDefects() != null){
            //send defects to separate topic

        }
        String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
        System.out.println(auditJson);
        return auditJson;
    }

I have a large json that is coming in from a kafka topic that I am converting to a Java object to pull out just the values I need in the find DB. Some of the records will have an array of defects in them that I need to capture and send to a different topic so they can end up in their own table in the DB. Values are being inserted into the DB using a sink connector, that is why we are using multiple topics.

I have found branching and split, but it looks like that is more for determining which topic a consumed record should go to, not sending different parts of the record to different topics. Is there a way to do this or do I need to change my architecture somewhere.

    @Autowired
    void buildPipeline(StreamsBuilder builder) throws Exception{
        KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
                logger.info("started consumer");
        System.out.println(messageStream.toString());

        KStream<String, String> auditRecords = messageStream
                  .map((key, value) -> {
                      try {
                          return new KeyValue<>("", mapStringToAuditOutput(value));
                      } catch (JsonProcessingException e) {
                          e.printStackTrace();
                      }
                      return null;
                  });
     auditRecords.to(outputTopic);

    }
    public String mapStringToAuditOutput(String input) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        AuditOutput auditResults = null;
        try {
            auditResults=  mapper.readValue(input, AuditOutput.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        //check for multiple defects
        if(auditResults.getCaseObject().getDefects() != null){
            //send defects to separate topic

        }
        String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
        System.out.println(auditJson);
        return auditJson;
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

故事还在继续 2025-01-18 13:49:39

找到了分支和分割,但看起来这更多的是为了确定消费的记录应该转到哪个主题

正确。您需要在分支之前过滤+map/mapValues,以将部分/整个事件发送到不同的主题

更具体地说,创建中间KStream实例并多次使用to()

例如,

// Change the value serde to use your JSON class
KStream<String, AuditOutput> auditRecords = messageStream 
  .mapValues(value -> {
      try {
          // input Stream can be <String, String> but this step would be automatic if using a JSONSerde
          return mapStringToAuditOutput(value));
      } catch (JsonProcessingException e) {
          e.printStackTrace();
      }
      return null;
  }).filter((key, value) -> Objects.nonNull(value)); // remove bad JSON events
Map<String, KStream<String, AuditOutput>> branches = auditRecords.split(Named.as("Branch-"))
    .branch((key, value) -> value.getCaseObject().getDefects() != null,  /* first predicate  */
         Branched.as("Defects"))
    .defaultBranch(Branched.as("NoDefects"))              /* default branch */
);
branches.get("Branch-Defects").to(...)
branches.get("Branch-NoDefects").mapValues(... flatten ... ).to(...)

found branching and split, but it looks like that is more for determining which topic a consumed record should go to

Correct. You need to filter + map/mapValues prior to the branching to send parts/whole events to different topics

More specifically, create intermediate KStream instances and use to() multiple times

For example,

// Change the value serde to use your JSON class
KStream<String, AuditOutput> auditRecords = messageStream 
  .mapValues(value -> {
      try {
          // input Stream can be <String, String> but this step would be automatic if using a JSONSerde
          return mapStringToAuditOutput(value));
      } catch (JsonProcessingException e) {
          e.printStackTrace();
      }
      return null;
  }).filter((key, value) -> Objects.nonNull(value)); // remove bad JSON events
Map<String, KStream<String, AuditOutput>> branches = auditRecords.split(Named.as("Branch-"))
    .branch((key, value) -> value.getCaseObject().getDefects() != null,  /* first predicate  */
         Branched.as("Defects"))
    .defaultBranch(Branched.as("NoDefects"))              /* default branch */
);
branches.get("Branch-Defects").to(...)
branches.get("Branch-NoDefects").mapValues(... flatten ... ).to(...)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文