Kafka Consumer 在简单的 Java 应用程序中工作,但在 Spring 上下文中失败

发布于 2025-01-10 02:39:00 字数 2916 浏览 0 评论 0原文

我定义了一个 kafka 消费者,如下面的代码所示。当我从 main 方法运行时,它运行、验证和消耗良好。但是,如果我将此类转换为 spring 组件,则在启动 spring 应用程序时身份验证将失败。该消费者是我无法运行的较大 Spring 应用程序的一部分。为了尝试隔离问题,我创建了一个简单的 java 类(如下)来仅测试消费者。它在这种情况下工作得很好,但在更大的 Spring 应用程序中却失败了。

应用程序重复打印以下行:

org.apache.kafka.clients.NetworkClient: Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

org.apache.kafka.clients.NetworkClient: Bootstrap broker server1.my_domain.com disconnected
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SampleConsumer {

    public static final String BOOTSTRAP_SERVERS = "server1.my_domain.com";
    public static final String TOPIC = "SOME_TOPIC";
    public static final String CONSUMER_GROUP = "SOME_GROUP";

    public static void start(){
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("sasl.jaas.config", getJaasConfig());

        final KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        final int giveUp = 100;
        int noRecordCount = 0;

        while(true){
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            if(consumerRecords.count() == 0){
                noRecordCount++;
                if(noRecordCount > giveUp) break;
                else continue;
            }
            consumerRecords.forEach(System.out::println);
        }
    }

    private static String getJaasConfig(){
        return "com.sun.security.auth.module.Krb5LoginModule required"
                + " serviceName=\"kafka\""
                + " doNotPrompt=true"
                + " useKeyTab=true"
                + " storeKey=true"
                + " principal=\"[email protected]\""
                + " keyTab=\"D:/certs/sampleKeyTab\";";
    }

    public static void main(String[] args) {
        start();
    }
}

I defined a kafka consumer as seen in the code below. It runs, authenticates, and consumes fine when I run from the main method. If however I convert this class into a spring component, authentication fails when the spring app is started. This consumer is part of a larger spring application that I was not able to get running. In attempts to isolate the problem, I created a simple java class (below) to test only the consumer. It works well in this context, but fails in a larger Spring app.

The application prints the following line repeatedly:

org.apache.kafka.clients.NetworkClient: Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

org.apache.kafka.clients.NetworkClient: Bootstrap broker server1.my_domain.com disconnected
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SampleConsumer {

    public static final String BOOTSTRAP_SERVERS = "server1.my_domain.com";
    public static final String TOPIC = "SOME_TOPIC";
    public static final String CONSUMER_GROUP = "SOME_GROUP";

    public static void start(){
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("sasl.jaas.config", getJaasConfig());

        final KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        final int giveUp = 100;
        int noRecordCount = 0;

        while(true){
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            if(consumerRecords.count() == 0){
                noRecordCount++;
                if(noRecordCount > giveUp) break;
                else continue;
            }
            consumerRecords.forEach(System.out::println);
        }
    }

    private static String getJaasConfig(){
        return "com.sun.security.auth.module.Krb5LoginModule required"
                + " serviceName=\"kafka\""
                + " doNotPrompt=true"
                + " useKeyTab=true"
                + " storeKey=true"
                + " principal=\"[email protected]\""
                + " keyTab=\"D:/certs/sampleKeyTab\";";
    }

    public static void main(String[] args) {
        start();
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

荒芜了季节 2025-01-17 02:39:00

在您发布的代码片段中,您使用了静态方法,我想您的 spring 组件中有这些方法的非静态版本,因为静态方法在 spring 组件中非常不典型。

检查您的 spring 组件类是否已正确初始化,即所有必需的属性均已设置,以及您是否正在捕获并抑制异常。

如果您没有使用 并依赖于普通的 KafkaConsumer ,那么 spring 无法控制您创建的 KafkaConsumer ,也就是说,它不管理它,所以它是否是在 spring 组件中编写的否则,它应该表现出 相同的。

如果您创建了 Spring 组件,通常 start() 逻辑应该位于 @PostConstruct 排除连续轮询(阻止后构造)。

调试:

如果您依赖 spring 来获取 KafkaConsumer 属性,
确保它们在您的 application.properties 中正确设置或
无论他们从哪里被接走。您可以打印以下属性
消费者和检查。

In the snippet you have posted, you have used static methods, I suppose you have non-static versions of those in your spring component since static methods are quite atypical in spring components.

Check if your spring component class is properly initialized i.e. all the required properties are set and if you are catching and supressing the exceptions.

If you are not using and relied on plain KafkaConsumer then spring has no control over your KafkaConsumer that you create i.e. to say, it does not manage it, so whether it is written in a spring component or otherwise, it should behave the same.

If you have created a Spring component, typically the start() logic ought to be in the @PostConstruct excluding the continuous polling (which blocks the post-construct).

Debugging:

If you are relying on spring to fetch KafkaConsumer properties,
ensure that they are properly set in your application.properties or
from where-ever they are picked up. You can print the properties of
both the consumers and check.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文