Spring Kafka 在侦听器级别按标头过滤消息

发布于 2025-01-09 06:49:33 字数 1180 浏览 1 评论 0原文

我有一个遗留的卡夫卡主题,其中发送不同类型的消息,这些消息是用自定义标头编写的,并带有特定的键来区分记录。 在给定的应用程序上,我有多种方法,我想用自定义注释来注释,例如 @CustomKafkaListener(discriminator="xxx") ,它将用 @KafkaListener 注释。

如何过滤消息,以便如果我有 2 条消息发送到中心主题,则用鉴别器“xxx”注释的方法将仅读取这些消息,而用鉴别器“yyy”注释的方法将仅读取“yyy”消息。

例如,

    @CustomKafkaListener(discriminator="com.mypackage.subpackage", topic="central-topic")
    public void consumerMessagesXXX(ConsumerRecord r){
    // reads only XXXX messages skip all others
    }
    
    
    @CustomKafkaListener(discriminator="com.mypackage", topic="central-topic")
    public void consumerMessagesYYY(ConsumerRecord r){
    // reads only YYY messages skip all others
    }

我希望过滤器能够读取目标侦听器的鉴别器属性,并动态决定该侦听器是否应通过反射或通过提供给过滤器的某些元数据来处理消息

  public boolean filter(ConsumerRecord consumerRecord, Consumer<Long, Event> consumer) {
  var discriminatorPattern = consumer.getMetadataXXX();//retrieve discriminator information either by reflection or metadata
return    
   discriminatorPattern .matches(consumerRecord().lastHeader("discriminator").value());
}

I have a legacy kafka topic where different type of messages get sent, these messages are written with a custom header with a specific key to discriminate the record.
On a given application I have multiple methods that I would like to annotate with custom annotation like @CustomKafkaListener(discriminator="xxx") which would be annotated with @KafkaListener.

How can I filter the messages so that if I have 2 messages sent to the central topic the method annotated with discriminator "xxx" would only read those messages whereas the method annotated with discriminator "yyy" would only read the "yyy" ones.

For example

    @CustomKafkaListener(discriminator="com.mypackage.subpackage", topic="central-topic")
    public void consumerMessagesXXX(ConsumerRecord r){
    // reads only XXXX messages skip all others
    }
    
    
    @CustomKafkaListener(discriminator="com.mypackage", topic="central-topic")
    public void consumerMessagesYYY(ConsumerRecord r){
    // reads only YYY messages skip all others
    }

I would like for the filter to be able to read the discriminator property of the target listener and decide dynamically if a message should be processed by that listener either by reflection or by some metadata provided to the filter for example

  public boolean filter(ConsumerRecord consumerRecord, Consumer<Long, Event> consumer) {
  var discriminatorPattern = consumer.getMetadataXXX();//retrieve discriminator information either by reflection or metadata
return    
   discriminatorPattern .matches(consumerRecord().lastHeader("discriminator").value());
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

相权↑美人 2025-01-16 06:49:33

创建自定义注释是一个非常高级的主题;您需要对注释 bean 后处理器进行子类化,并提出一些通过添加过滤策略 bean 来自定义端点的机制。

欢迎在 GitHub https://github.com/spring- 上提出新功能请求items/spring-kafka/issues

我们可以添加一个新属性来传递来自 @KafkaListenerRecordFilterStrategy bean 的 bean 名称。

编辑

我看到您打开了一个问题 ;谢谢。

这是稍后添加过滤器的解决方法...

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300", autoStartup = "false")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300", autoStartup = "false")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> xxx() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "xxx".getBytes());
        };
    }

    @Bean
    RecordFilterStrategy<String, String> yyy() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "yyy".getBytes());
        };
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> xxx, RecordFilterStrategy<String, String> yyy,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);

            updateListener("xxx", xxx, registry);
            updateListener("yyy", yyy, registry);
            registry.start();
        };
    }

    private void updateListener(String id, RecordFilterStrategy<String, String> filter,
            KafkaListenerEndpointRegistry registry) {

        MessageListener listener = (MessageListener) registry.getListenerContainer(id).getContainerProperties()
                .getMessageListener();
        registry.getListenerContainer(id).getContainerProperties()
                .setMessageListener(new FilteringMessageListenerAdapter<>(listener, filter));
    }

}
1:test.to.xxx
2:test.to.yyy

EDIT2

此版本使用单个过滤器并使用消费者的 group.id 作为鉴别器:

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> discriminator(
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        RecordFilterStrategy<String, String> filter = rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), KafkaUtils.getConsumerGroupId().getBytes());
        };
        factory.setRecordFilterStrategy(filter);
        return filter;
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> discriminator,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);
        };
    }

}
1:test.to.xxx
2:test.to.yyy

Creating custom annotations is a pretty advanced topic; you would need to subclass the annotation bean post processor and come up with some mechanism to customize the endpoint by adding the filter strategy bean.

Feel free to open a new feature request on GitHub https://github.com/spring-projects/spring-kafka/issues

We could add a new property to pass the bean name of a RecordFilterStrategy bean from the @KafkaListener.

EDIT

I see you opened an issue; thanks.

Here is a work around to add the filters later...

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300", autoStartup = "false")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300", autoStartup = "false")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> xxx() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "xxx".getBytes());
        };
    }

    @Bean
    RecordFilterStrategy<String, String> yyy() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "yyy".getBytes());
        };
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> xxx, RecordFilterStrategy<String, String> yyy,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);

            updateListener("xxx", xxx, registry);
            updateListener("yyy", yyy, registry);
            registry.start();
        };
    }

    private void updateListener(String id, RecordFilterStrategy<String, String> filter,
            KafkaListenerEndpointRegistry registry) {

        MessageListener listener = (MessageListener) registry.getListenerContainer(id).getContainerProperties()
                .getMessageListener();
        registry.getListenerContainer(id).getContainerProperties()
                .setMessageListener(new FilteringMessageListenerAdapter<>(listener, filter));
    }

}
1:test.to.xxx
2:test.to.yyy

EDIT2

This version uses a single filter and uses the consumer's group.id as the discriminator:

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> discriminator(
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        RecordFilterStrategy<String, String> filter = rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), KafkaUtils.getConsumerGroupId().getBytes());
        };
        factory.setRecordFilterStrategy(filter);
        return filter;
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> discriminator,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);
        };
    }

}
1:test.to.xxx
2:test.to.yyy
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文