如何有条件地从 Kafka 主题轮询消息
我在 MongoDB 数据库中有一些任务通知。每个任务都有一个 due_date 和提醒标志。我正在将这些任务推送到 Kafka 主题。有一个 Node JS 应用程序从该主…
Spring Kafka 手动提交在 Batch 监听模式下如何工作
我有一个有 2 个分区的主题。我在我的消费者应用程序中使用 kafka 批处理侦听器模式。由于我使用的是单个消费者应用程序,因此我将从两个分区接收消息…
负载测试服务:Kafka Consumer、Aerospike Reader
我有一个具有以下架构的服务。 它使用来自一个 Kafka 主题的消息。 它从 Aerospike 读取记录。 它将丰富的消息放入另一个 Kafka 主题。 这是架构。 我…
无法打印来自kafka消费者的数据(基于日志的变更数据捕获项目)
我有一个项目,使用kafka、zookeeper、debezium从mysql数据库捕获数据更改。实际上,我对kafka很陌生,我只是遵循debezium教程,所以在尝试使用kafka-…
Kafka Connect 服务失败且没有任何日志
我使用 Kafka 连接与 Kafka bin 版本 2.7.0, 连接服务以独立模式运行并配置了 SSL, 由于内存不足问题(导致我们增加分配的内存)导致服务失败后,Ka…
在 Kafka 中存储每小时数据 |为现有系统设计消费者的问题
我必须在现有系统中用 Kafka 替换 Couchbase。在现有系统中,文档被写入 Couchbase,Java 程序从中读取数据,聚合内存中的数据,并在一小时结束时将数…
Kafka jdbc接收器连接器创建与原始数据类型不匹配的数据类型
我正在使用 Kafka 和 Kafka Connect 将 MS SQL Server 数据库复制到 MySQL,使用 debezium sql server CDC 源连接器和汇合 JDBC 接收器连接器。 “aut…
类路径为空。请首先构建项目,例如运行“./gradlew jar -PscalaVersion=2.13.6”
我已经进行了很多搜索来解决这个问题,但没有任何效果对我有用。 我正在尝试在我的 Linux 机器(Mint)上安装 Kafka,并且已经到达启动 Kafka 代理的…
使用我想要拆分并发送到两个不同主题的 kafkastreams 使用 json
我有一个来自 kafka 主题的大型 json,我正在将其转换为 Java 对象,以便在查找数据库中提取我需要的值。有些记录中会有一系列缺陷,我需要捕获这些缺…
有没有办法用 Kafka 消费中的最新消息替换旧消息(避免最终 df 中重复)
我正在使用来自某个主题的数据,正如我们所知,我们实时获取数据,其中我们看到重复的元素,如何实际上用最新消息替换旧消息。 我使用以下相同的代码…
application.yaml 中的 Spring Boot Kafka StreamsConfig 或 ConsumerConfig 不适用
我有一个非常简单的带有 KTable 的 spring boot 项目,我想在 application.yml 中自定义我的配置,但该配置似乎没有被应用。这是我的配置文件 applica…
Kafka的 Producer.flush() 是否验证所有数据都发送到主题?
我每天通过 Pythonic KafkaProducer 向 Kafka 主题发送大约 1200 万行数据。 数据需要为JSON格式,每行为1个条目, 每 100 万行执行一次 Producer.flu…