卡夫卡+分区副本如何大于配置的 ReplicationFactor
我们在 Linux 机器上有 Kafka 集群,有 11 个代理, 我们有主题 - my_first_car ,有 300 个分区,并且这个主题配置了复制因子 = 3 我们从 kafka-topi…
codec.format 中的 %{[offset]} 是什么意思?
我对 filebeat.yml 文件有疑问。 在我的 filebit.yml 设置中,输入是 log,输出是 kafka。 当如下使用时,%{[offset]}的含义是否表示日志文件的偏移量…
Spring Kafka Consumer 跳过一些偏移量
我们有一个非常简单的 Kafka Consumer(v 2.6.2)。 它是消费者组中唯一的消费者,并且该组是唯一从主题读取的消费者(有 6 个分区,其中约有 300 万…
Spring Cloud Kafka Stream StreamsUncaughtExceptionHandler
我正在尝试将 StreamsUncaughtExceptionHandler 添加到我的 Kafka 流处理器中。该处理器是用 Kafka 函数编写的。我查看了 Artem Bilan 提供的建议,将…
Kafka docker - NoSuchFileException:/opt/kafka.server.keystore.jks
我已经通过docker安装了kafka。 当我运行 docker-compose up 命令时,我遇到以下错误: [2022-02-28 08:13:24,185] INFO Awaiting socket connections…
连接到在 Docker 中运行的 Kafka
我在本地计算机上设置了一个单节点 Kafka Docker 容器,如 Confluence 文档(步骤 2-3)。 此外,我还公开了 Zookeeper 的端口 2181 和 Kafka 的端口 …
使用 Kafka TimestampConverter 微秒?
我是 Kafka/Avro 的新手。我的下游数据库 (Postgres) 有一个 timestamptz 列。我的上游数据库 (Materialize) 为该列生成以下 Avro 模式: "type": [ "…
Fineract-CN 中将 ActiveMQ 替换为 Kafka
Fineract-CN有人用Kafka替换ActiveMQ吗? 我想在Fineract-CN模块中使用Kafka,但是Spring Boot版本目前是1.4.1,不可能。…
Python KafkaConsumer 将 json 输出(回复)重定向到文件
我正在 Python 中使用 Kafkaconsumer 消费 Kafka 消息。 我从 Kafka 得到的回复是 json 格式。我想知道如何使用从 Kafka 获得的回复创建 json 文件。 …
来自 Kafka 消息的 Websocket
我正在开发一个物联网项目,该项目使用 MQTT 协议将传感器数据从嵌入式设备传输到应用程序。为此,我创建了 一个 MQTT 代理来从设备发送数据。 一个自…
Kafka Streams:java.lang.IllegalArgumentException:VoidDeserializer 的数据应为 null
我正在处理我的 Kafka Streams 的第一个示例: package com.example; import java.util.Properties; import org.apache.kafka.clients.consumer.Consu…
如何在需要时打开和关闭kafka读取消息
我有一个从 kafka 读取消息的 spring 应用程序,我希望当我启用维护时,我的应用程序将不再从 kafka 读取消息,当我关闭维护时,它将恢复正常读取…