带有DSL Kafka的Kafka嵌入式服务器。
我正在使用Kafka消息驱动的通道适配器来阅读KAFKA主题的消息。它可以正常工作,但是当我使用嵌入式Kafka经纪人进行集成测试时,它不起作用,也不听取该主题。 这里的代码
public IntegrationFlow topic1ListenerKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer(), KafkaMessageDrivenChannelAdapter.ListenerMode.batch)
.batchMessageConverter(batchMessageConverter)
.errorChannel(errorChannel)
.outputChannel(inputQueueChannel)).get();
}
public AbstractMessageListenerContainer kafkaMessageListenerContainer(){
AbstractMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(consumerFactory(), containerProperties());
kafkaMessageListenerContainer.setAutoStartup(true);
return kafkaMessageListenerContainer;
}
private ContainerProperties containerProperties() {
ContainerProperties properties = new ContainerProperties("TEST_TOPIC");
properties.setAckMode(ContainerProperties.AckMode.BATCH);
properties.setConsumerTaskExecutor(createDefaultTaskExecutor());
return properties;
}
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumerGroup-1");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(properties);
}
这是侦听器嵌入式经纪人在
public EmbeddedKafkaBroker embeddedKafkaBroker() {
return new EmbeddedKafkaBroker(1, false, 1, "TEST_TOPIC")
.kafkaPorts(9092).brokerProperty("listeners", "PLAINTEXT://localhost:9092");
}
是生产者配置,
public Properties getKafkaProperties() {
final Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public Future<RecordMetadata> sendMessage(String TEST_TOPIC, String message){
KafkaProducer<String, String> producer = new KafkaProducer<>(getKafkaProperties());
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC,"Key", message);
return producer.send(producerRecord);
}
当我发送消息已成功发送到主题时,但是 kafka.messagedrivenrivenchanneladapter 没有接收该消息。
我尝试了很多,但找不到根本原因。
I am using kafka messages driven channel adapter to read messages from kafka topic. It works fine, but when I use embedded Kafka broker for integration test, it not working and doesn't listen to that topic..
Here is the code for Listener
public IntegrationFlow topic1ListenerKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer(), KafkaMessageDrivenChannelAdapter.ListenerMode.batch)
.batchMessageConverter(batchMessageConverter)
.errorChannel(errorChannel)
.outputChannel(inputQueueChannel)).get();
}
public AbstractMessageListenerContainer kafkaMessageListenerContainer(){
AbstractMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(consumerFactory(), containerProperties());
kafkaMessageListenerContainer.setAutoStartup(true);
return kafkaMessageListenerContainer;
}
private ContainerProperties containerProperties() {
ContainerProperties properties = new ContainerProperties("TEST_TOPIC");
properties.setAckMode(ContainerProperties.AckMode.BATCH);
properties.setConsumerTaskExecutor(createDefaultTaskExecutor());
return properties;
}
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumerGroup-1");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(properties);
}
Embedded broker is
public EmbeddedKafkaBroker embeddedKafkaBroker() {
return new EmbeddedKafkaBroker(1, false, 1, "TEST_TOPIC")
.kafkaPorts(9092).brokerProperty("listeners", "PLAINTEXT://localhost:9092");
}
here is producer config
public Properties getKafkaProperties() {
final Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public Future<RecordMetadata> sendMessage(String TEST_TOPIC, String message){
KafkaProducer<String, String> producer = new KafkaProducer<>(getKafkaProperties());
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC,"Key", message);
return producer.send(producerRecord);
}
when i send a message it send successfully to topic but Kafka.messageDrivenChannelAdapter is not picking up that messages.
I tried a lot but could not find the root cause.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论