如果在处理过程中抛出了例外,Kafka-streams是否会承诺偏移?
我有一个KAFKA流应用程序,并且创建了自定义生产/消费/未接收的异常处理程序,尽管例外,所有返回继续继续进行处理。在这种情况下,kafka流是否承担偏…
结构化流 - 从gke上读取strimzi kafka,每10分钟将数据写入mongo
我在KAFKA主题中有数据(数据每10分钟发布一次),我计划使用Apache Spark结构化流(批处理模式)读取此数据,然后将其推入MongoDB。 请注意: 这将在…
处理MySQL中多个表的单个KAFKA主题消息
我正在阅读从SFTP连接器到KAFKA主题“ topic1”的CSV文件。 每个记录都包含许多列,这些列需要存储在MySQL数据库中的不同表中。 例如: 每条消息看起…
如何自定义ID生成@kafkalistener?
我需要能够根据 @kafkalistener 的每个方法自定义ID,并基于其属性之一的值以及 application.yaml 中定义的值。 ,例如: 类似方法的方法的课程。 @Ka…
在本地Windows机器上运行Kafka
我正在通过在Windows 10机器(JVM)上运行KAFKA来尝试Kafka。卡夫卡(Kafka)和Zookeeper一起工作正常。我还试图运行Kafka连接到Docker内部运行的Post…
与源相比,通过Kafka插入的记录均不得
我正在使用Snowflake-Kafka连接器。在其中包含3个KAFKA服务器群集的位置。这具有分布式连接器,该连接器在Zookeeper和Kafka服务上方执行。最重要的是…
如何按主题从模式注册表中获取AVRO模式?
我在Localhost:8083上有架构注册表,并希望以其他地方使用AVRO模式“ test.data”。 当我尝试时, $ curl --location --request GET 'http://localho…
在Kubernetes中部署Kafka-UI有502个坏网关
我试图在本地的kubernetes群集中部署Kafka-UI,但是Ingress-nginx给出了502个错误(不良网关)。我使用以下配置: 部署: apiVersion: apps/v1 kind: …
Micronaut Kafka听众:如何以15分钟的间隔安排Kafka Consumer Poll()
我想每15分钟对该话题进行轮询。我发现有一种方法可以在Kafka-spring中使用ConcurrentkafkalistenerconcontainerFactory进行操作,而以下配置 factory…
来自Timestamp的两个不同KAFKA主题的Spark聚集事件
假设一个具有以下两个主题的Kafka系统: 创建的 删除, 它们用于宣传项目的创建和删除。 Kafka中事件的结构是JSON,这两个主题都是相同的: { "id" : …
春季kafka与春季JPA
我有两个微型服务A和B。 一项服务正在向Kafka主题“ A-Topic”发送消息。 B服务正在消耗消息。 在B服务中,Kafka侦听器将执行以下步骤, 1. Persist t…
为每个kafka消息致电远程REST API
我正在Kafka生产商中读取文件(逐行),并在流中读取每条记录(即每行),而Kafka消费者正在收到此记录。Further我需要将其发送给KAFKA消费者的REST A…
KAFKA架构注册表 - 屏蔽邮件在Kafka的架构注册表中接受的消息
我在Kafka有一个模式,我需要每次在此主题中发布帖子时,我注册的的模式就会检查是否处于相同的模式正在发送的模式。 我的模式是: curl帖子: curl -…