有没有办法用 Kafka 消费中的最新消息替换旧消息(避免最终 df 中重复)
我正在使用来自某个主题的数据,正如我们所知,我们实时获取数据,其中我们看到重复的元素,如何实际上用最新消息替换旧消息。 我使用以下相同的代码…
application.yaml 中的 Spring Boot Kafka StreamsConfig 或 ConsumerConfig 不适用
我有一个非常简单的带有 KTable 的 spring boot 项目,我想在 application.yml 中自定义我的配置,但该配置似乎没有被应用。这是我的配置文件 applica…
Kafka的 Producer.flush() 是否验证所有数据都发送到主题?
我每天通过 Pythonic KafkaProducer 向 Kafka 主题发送大约 1200 万行数据。 数据需要为JSON格式,每行为1个条目, 每 100 万行执行一次 Producer.flu…
Strimzi KafkaConnect &连接器错误,无法加载
我不知道还能去哪里,因为我看到的所有例子我都复制了很多,但仍然无法让它工作。连接器将不会安装并显示空密码。我已经验证了每一步,但无法使其发挥…
KSQL 查询检查值完整性
组成的流 我有一个由以下示例值、 correlation_id 和 event_type 示例 aud-103触发器 aud-104触发器 aud-109 缓解措施 aud-103 缓解 如果检测到具有相…
卡夫卡+ Kafka 代理之间或代理与客户端之间可能打开的文件过多
我们的 Apache Kafka 服务器面临着非常大的问题 我们的 Kafka 服务器之所以增加,是因为“打开文件太多”, 我们的生产 Kafka 集群有 7 台机器,而 Ka…
如何删除以编程方式创建的kafka主题
我正在以编程方式创建 Kafka 主题。当应用程序结束时。我必须删除创建的 Kafka 主题。我正在使用 adminClient.deleteTopics(TOPIC_LIST) 但主题仍然没…
Spring Cloud Kafka Stream:使用 Avro 发布到 DLQ 失败
在使用 ErrorHandlingDeserializer 与 Avro 组合处理错误时,我无法发布到 Dlq 主题。以下是发布时的错误。 主题 TOPIC_DLT 在 60000 毫秒后不存在于…
Kafka 3 默认 state.dir 的 AccessDeniedException
当我们在 Windows 10 上运行 Kafka 时,每 5 秒就会出现以下错误。 无法将全局存储的偏移检查点文件写入 C:/tmp/kafka-streams/user/global/.checkpoi…
Consumer不会从topic中获取新记录,并且每次启动时总是指向相同的偏移位置
我正在使用 spring-kafka 2.3.7 问题 1: 系统中有两个消费者 (KafkaListeners)。当应用程序启动其中一个消费者时,总是指向相同的主题/分区/偏移量。…
Strimzi Kafka 使用本地节点存储
我正在 kubernetes 上运行 kafka(部署在 Azure 上),使用 strimzi 作为开发环境,并且更喜欢使用内部 kubernetes 节点存储。如果我使用 persistant-…
提取结构化二进制数据(从 Kafka)并转换为 Integer
我正在尝试使用 Python 从 Kafka 将数据提取到 Spark 中。来自 Kafka 的数据采用 JSON 格式,如下所示: {"order_id": 56, "customer_id": 772, "taxf…
Spring kafka setErrorHandler 已弃用替换(引导 2.6.4)
在 Spring Boot 2.6.4 上,此方法已被弃用。 public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListe…
在 EC2 上运行 kafka 消费者
我创建了 MSK 集群,并从我的 Producer 发送了一条消息。 效果很好。 而且,我想让我的消费者得到这个消息。 Consumer在一个python文件中,而python文…