Kafka Consumer 在简单的 Java 应用程序中工作,但在 Spring 上下文中失败
我定义了一个 kafka 消费者,如下面的代码所示。当我从 main 方法运行时,它运行、验证和消耗良好。但是,如果我将此类转换为 spring 组件,则在启动 spring 应用程序时身份验证将失败。该消费者是我无法运行的较大 Spring 应用程序的一部分。为了尝试隔离问题,我创建了一个简单的 java 类(如下)来仅测试消费者。它在这种情况下工作得很好,但在更大的 Spring 应用程序中却失败了。
应用程序重复打印以下行:
org.apache.kafka.clients.NetworkClient: Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
org.apache.kafka.clients.NetworkClient: Bootstrap broker server1.my_domain.com disconnected
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SampleConsumer {
public static final String BOOTSTRAP_SERVERS = "server1.my_domain.com";
public static final String TOPIC = "SOME_TOPIC";
public static final String CONSUMER_GROUP = "SOME_GROUP";
public static void start(){
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("sasl.jaas.config", getJaasConfig());
final KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
final int giveUp = 100;
int noRecordCount = 0;
while(true){
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
if(consumerRecords.count() == 0){
noRecordCount++;
if(noRecordCount > giveUp) break;
else continue;
}
consumerRecords.forEach(System.out::println);
}
}
private static String getJaasConfig(){
return "com.sun.security.auth.module.Krb5LoginModule required"
+ " serviceName=\"kafka\""
+ " doNotPrompt=true"
+ " useKeyTab=true"
+ " storeKey=true"
+ " principal=\"[email protected]\""
+ " keyTab=\"D:/certs/sampleKeyTab\";";
}
public static void main(String[] args) {
start();
}
}
I defined a kafka consumer as seen in the code below. It runs, authenticates, and consumes fine when I run from the main method. If however I convert this class into a spring component, authentication fails when the spring app is started. This consumer is part of a larger spring application that I was not able to get running. In attempts to isolate the problem, I created a simple java class (below) to test only the consumer. It works well in this context, but fails in a larger Spring app.
The application prints the following line repeatedly:
org.apache.kafka.clients.NetworkClient: Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
org.apache.kafka.clients.NetworkClient: Bootstrap broker server1.my_domain.com disconnected
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SampleConsumer {
public static final String BOOTSTRAP_SERVERS = "server1.my_domain.com";
public static final String TOPIC = "SOME_TOPIC";
public static final String CONSUMER_GROUP = "SOME_GROUP";
public static void start(){
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("sasl.jaas.config", getJaasConfig());
final KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
final int giveUp = 100;
int noRecordCount = 0;
while(true){
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
if(consumerRecords.count() == 0){
noRecordCount++;
if(noRecordCount > giveUp) break;
else continue;
}
consumerRecords.forEach(System.out::println);
}
}
private static String getJaasConfig(){
return "com.sun.security.auth.module.Krb5LoginModule required"
+ " serviceName=\"kafka\""
+ " doNotPrompt=true"
+ " useKeyTab=true"
+ " storeKey=true"
+ " principal=\"[email protected]\""
+ " keyTab=\"D:/certs/sampleKeyTab\";";
}
public static void main(String[] args) {
start();
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在您发布的代码片段中,您使用了静态方法,我想您的 spring 组件中有这些方法的非静态版本,因为静态方法在 spring 组件中非常不典型。
检查您的 spring 组件类是否已正确初始化,即所有必需的属性均已设置,以及您是否正在捕获并抑制异常。
如果您没有使用 spring-kafka 并依赖于普通的 KafkaConsumer ,那么 spring 无法控制您创建的 KafkaConsumer ,也就是说,它不管理它,所以它是否是在 spring 组件中编写的否则,它应该表现出 相同的。
如果您创建了 Spring 组件,通常
start()
逻辑应该位于@PostConstruct
排除连续轮询(阻止后构造)。调试:
In the snippet you have posted, you have used static methods, I suppose you have non-static versions of those in your spring component since static methods are quite atypical in spring components.
Check if your spring component class is properly initialized i.e. all the required properties are set and if you are catching and supressing the exceptions.
If you are not using spring-kafka and relied on plain
KafkaConsumer
then spring has no control over yourKafkaConsumer
that you create i.e. to say, it does not manage it, so whether it is written in a spring component or otherwise, it should behave the same.If you have created a Spring component, typically the
start()
logic ought to be in the@PostConstruct
excluding the continuous polling (which blocks the post-construct).Debugging: