使用Spring EL将可选的后缀从属性添加到@KafkaListener中的consumerGroup

发布于 2025-01-10 12:54:51 字数 493 浏览 0 评论 0原文

我有一个带有 Kafka Consumers 的简单 Spring Boot 应用程序,看起来像

@KafkaListener(topics="topic", groupId="SOME_CONSTANT") {
....
}

我需要做的是添加可选的 Spring Boot 属性(来自环境变量,但这并不重要)可以说: myapp.env: TEST

当该变量存在时,我应该自动将消费者组更新为 SOME_CONSTANT-TEST

我正在使用 SPEL

@KafkaListener(topics="topic", groupId="#{ '${myApp.env}' == null ? 'SOME_CONSTANT' : 'SOME_CONSTANT' + '-' + '${myApp.env}}'") {
....
}

但这似乎不起作用:/有什么想法吗?

I have simple spring boot application with Kafka Consumers that looks like

@KafkaListener(topics="topic", groupId="SOME_CONSTANT") {
....
}

What I am required to do Is to add optional spring boot property (from env variables but that is not important) lets say:
myapp.env: TEST

And when that variable is present I should automatically update consumer group to be
SOME_CONSTANT-TEST

I am playing with SPEL

@KafkaListener(topics="topic", groupId="#{ '${myApp.env}' == null ? 'SOME_CONSTANT' : 'SOME_CONSTANT' + '-' + '${myApp.env}}'") {
....
}

But that does not seem to work :/ Any Ideas?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

(り薆情海 2025-01-17 12:54:51

您可以使用 T 运算符读取常量的值,并在没有环境变量的情况下使用冒号 ':':

@KafkaListener(topics="topic", groupId=" #{ '${my.app.env:}' == '' ? T(com.mypackage.MyListener).SOME_CONSTANT : T(com.mypackage.MyListener).SOME_CONSTANT + '-' + '${my.app.env:}'}")

这是使用此解决方案的示例应用程序:

package org.spring.kafka.playground;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@SpringBootApplication
public class SO71291726 {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(SO71291726.class, args);
        try {
            Thread.sleep(10000);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new RuntimeException("Interrupted");
        }
        KafkaOperations kafkaTemplate = context.getBean("kafkaTemplate", KafkaOperations.class);
        kafkaTemplate.send("topic", "My message");
    }

    Logger log = LoggerFactory.getLogger(this.getClass());

    public static final String SOME_CONSTANT = "my-group-id-constant";

    @Component
    class MyListener {

        @KafkaListener(topics="topic", groupId="#{ '${71291726.my.app.env:}' == '' ? T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT : T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT + '-' + '${71291726.my.app.env:}'}")
        void listen(String message, @Header(KafkaHeaders.GROUP_ID) String groupId) {
            log.info("Received message {} from group id {} ", message, groupId);
        }
    }
}

输出:
2022-02-28 14:26:14.733 INFO 18841 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$cf264156 :收到消息我来自组 id my-group-id- 的消息常量

如果我添加71291726.my.app.env = TESTapplication.properties 文件:

2022-02-28 14:34:03.900 INFO 18870 --- [ntainer #0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$e1a5933e:收到消息我的消息来自组 id my-group-id-constant-TEST

You can use the T operator to read the constant's value, and use the colon ':' for the case when there's no env variable:

@KafkaListener(topics="topic", groupId="#{ '${my.app.env:}' == '' ? T(com.mypackage.MyListener).SOME_CONSTANT : T(com.mypackage.MyListener).SOME_CONSTANT + '-' + '${my.app.env:}'}")

Here's a sample application with this solution:

package org.spring.kafka.playground;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@SpringBootApplication
public class SO71291726 {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(SO71291726.class, args);
        try {
            Thread.sleep(10000);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new RuntimeException("Interrupted");
        }
        KafkaOperations kafkaTemplate = context.getBean("kafkaTemplate", KafkaOperations.class);
        kafkaTemplate.send("topic", "My message");
    }

    Logger log = LoggerFactory.getLogger(this.getClass());

    public static final String SOME_CONSTANT = "my-group-id-constant";

    @Component
    class MyListener {

        @KafkaListener(topics="topic", groupId="#{ '${71291726.my.app.env:}' == '' ? T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT : T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT + '-' + '${71291726.my.app.env:}'}")
        void listen(String message, @Header(KafkaHeaders.GROUP_ID) String groupId) {
            log.info("Received message {} from group id {} ", message, groupId);
        }
    }
}

Output:
2022-02-28 14:26:14.733 INFO 18841 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$cf264156 : Received message My message from group id my-group-id-constant

If I add 71291726.my.app.env = TEST to the application.properties file:

2022-02-28 14:34:03.900 INFO 18870 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$e1a5933e : Received message My message from group id my-group-id-constant-TEST

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文