如何自定义ID生成@kafkalistener?

发布于 2025-02-10 02:45:11 字数 441 浏览 2 评论 0原文

我需要能够根据@kafkalistener的每个方法自定义ID,并基于其属性之一的值以及application.yaml中定义的值。 ,例如:

类似方法的方法的课程。

@KafkaListener(info="myInfo")
public void listen(String msg){

}

在我的应用程序中

myapp:
  myProperty: myProp

拥有一个具有 当我不明确提供注释属性时,生产的自动生成。

实现这一目标的最佳方法是什么?我正在考虑扩展kafkalistenerendpointregistrarbeanpostProcessor

谢谢

I need to be able to customise the id for each method annotated with @KafkaListener based on the value of one of it's attributes as well as a value defined in the application.yaml, for example :

Having a class with a method annotated like so :

@KafkaListener(info="myInfo")
public void listen(String msg){

}

And a custom property in my application.yaml

myapp:
  myProperty: myProp

At runtime I would like to have the id for the registered endpoint consumer to be myInfo_myProp rather than the autogenerated one that is produced when I do not explicitly provide one in the attributes of the annotation.

What would the best way to achieve this? I was thinking of extending the KafkaListenerEndpointRegistrar or the BeanPostProcessor ?

Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

旧街凉风 2025-02-17 02:45:11

在调用超级类之前,覆盖端点注册表bean并在那里操纵端点属性。

这是一个例子:

@SpringBootApplication
public class So72719215Application {

    public static void main(String[] args) {
        SpringApplication.run(So72719215Application.class, args);
    }

    @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    KafkaListenerEndpointRegistry registry(@Value("${myApp.myProperty}") String myProperty) {
        return new KafkaListenerEndpointRegistry() {

            @Override
            public void registerListenerContainer(KafkaListenerEndpoint endpoint,
                    KafkaListenerContainerFactory<?> factory) {

                AbstractKafkaListenerEndpoint<?, ?> akle = (AbstractKafkaListenerEndpoint<?, ?>) endpoint;
                akle.setId(new String(akle.getListenerInfo()) + "_" + myProperty);
                akle.setGroupId("group_" + myProperty);
                super.registerListenerContainer(endpoint, factory);
            }

        };
    }

    @KafkaListener(topics = "so72719215", info = "foo")
    void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so72719215").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaListenerEndpointRegistry reg) {
        return args -> {
            System.out.println(reg.getListenerContainerIds());
        };
    }

}

Override the endpoint registry bean and manipulate the endpoint properties there, before calling the super class.

Here's an example:

@SpringBootApplication
public class So72719215Application {

    public static void main(String[] args) {
        SpringApplication.run(So72719215Application.class, args);
    }

    @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    KafkaListenerEndpointRegistry registry(@Value("${myApp.myProperty}") String myProperty) {
        return new KafkaListenerEndpointRegistry() {

            @Override
            public void registerListenerContainer(KafkaListenerEndpoint endpoint,
                    KafkaListenerContainerFactory<?> factory) {

                AbstractKafkaListenerEndpoint<?, ?> akle = (AbstractKafkaListenerEndpoint<?, ?>) endpoint;
                akle.setId(new String(akle.getListenerInfo()) + "_" + myProperty);
                akle.setGroupId("group_" + myProperty);
                super.registerListenerContainer(endpoint, factory);
            }

        };
    }

    @KafkaListener(topics = "so72719215", info = "foo")
    void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so72719215").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaListenerEndpointRegistry reg) {
        return args -> {
            System.out.println(reg.getListenerContainerIds());
        };
    }

}
爱给你人给你 2025-02-17 02:45:11

请参阅该id() @kafkalistener的属性的Javadocs:

/**
 * The unique identifier of the container for this listener.
 * <p>If none is specified an auto-generated id is used.
 * <p>Note: When provided, this value will override the group id property
 * in the consumer factory configuration, unless {@link #idIsGroup()}
 * is set to false or {@link #groupId()} is provided.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

因此,您可以使用SPEL和属性占位符为目标侦听器容器定义动态ID。像:

@KafkaKListener(id = "#{myBean.generateId('${property.from.env}')}")

See JavaDocs of that id() attribute of the @KafkaListener:

/**
 * The unique identifier of the container for this listener.
 * <p>If none is specified an auto-generated id is used.
 * <p>Note: When provided, this value will override the group id property
 * in the consumer factory configuration, unless {@link #idIsGroup()}
 * is set to false or {@link #groupId()} is provided.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

So, you can use SpEL and properties placeholders to define a dynamic id for the target listener container. Something like:

@KafkaKListener(id = "#{myBean.generateId('${property.from.env}')}")
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文