试图通过新的Elasticsearch Java API客户端索引kafka有效载荷时,listereRexecutionfailedException nullpointer nullpointer
我正在从HLRC迁移到新客户端,情况很顺利,但是由于某些原因,我无法索引特定的类/文档。这是我的客户端实现和索引请求: @Configuration public clas…
如何将清理添加到Spring Application.AYML?
是否可以添加Spring Application.yaml“ callup.policy”和“ log.cleaner.max.compaction.lag.ms”以配置kafka compact?如果可能的话,如何? 现在…
根据字段值,在kafka流中正确的flatmapvalues拆分单消息
需要一些指导wrt kafka流分裂。 我的消息值字段类似此 {"name": "val1", "role": "val2"} 消息的键是一个字符串字段,我们在这里不必担心。 在 name …
在批处理模式下弹簧云流中的错误处理
我正在使用Spring Cloud Stream和Kafka活页夹中的批次批量消费消息。我正在尝试实施错误处理机制。根据我的理解,我无法在批处理模式下使用Spring Clo…
kafka |确切的一方面消费者不止一次消耗消息
在我们的应用程序中,在生产者和消费者中都完全启用了。 生产者是python组件。我们已经启用了: iDempotence 使用交易(每次发送消息时每次使用新的Tr…
弹簧靴kafkalistener忽略内部捕获
我与kafkalistener一起使用Spring Boot,然后尝试治疗内部异常(约束VioLationException),但我不知道为什么内部尝试捕获不起作用, 具有关注代码, …
Spring Kafka-批处理处理不起作用
我有Spring Kafka消费者,我想每60秒消耗50张记录。我转介了几个文档,并配置了我的应用程序,例如> 消费者配置 @Bean public ConsumerFactory co…
在Springboot测试中禁用Kafka连接
我正在按照微服务体系结构进行Springboot项目,并使用Kafka作为活动总线来交换其中一些数据。我还进行了JUNIT测试,这些测试测试了我的应用程序的某些…
Spring Kafka @kafkalistener可以使用@Sendto(阅读和回复)与延期结果一起使用吗?
我需要这样的东西 @SendTo @KafkaListener(topics = "some") public Future getBatchInfo(String batchCode){ ...} 可以弹出kafka请求 - 重新机制可与…
为什么弹簧云2021.0.3需要其他配置?
我已经从 greenwich.sr1 升级为 2021.0.3 ,应用程序开始失败并显示以下说明: *************************** APPLICATION FAILED TO START **********…
KAFKA MSK-高fetch.max.wait.ms and fetch.min.bytes的配置出乎意料
我有一个在春季应用程序上运行的Kafka消费者。 我正在尝试使用 fetch.max.wait.ms 和 fetch.min.bytes 。 我希望消费者等到有15000000字节的消息或1分…
如何从application.yaml加载kafka消费者或生产者属性?
目前,我有此代码用于创建 concurrentkafkafkalistenercontainerfactory 。 @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerCon…