在Active-Passive Kafka群集体系结构中创建Kafka主题
我试图弄清楚在Active-Passive Kafka群集体系结构中创建主题的正确方法是什么。 假设我们在两个分离的可用性区域中有两个Kafka群集(主动 - 假)。 我…
从spark-submit中获取参数值为Intellij Spark Scala的文件之一
spark-submit --master yarn --deploy-mode cluster --num-executors=8 --executor-memory=4G --driver-memory=10G --conf spark.yarn.submit.waitApp…
如何从application.yaml加载kafka消费者或生产者属性?
目前,我有此代码用于创建 concurrentkafkafkalistenercontainerfactory 。 @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerCon…
KAFKAPRODUCER-获取错误以连接到Kafka(60.0秒后无法更新元数据)
我正在尝试阅读Oracle的数据并发送到Kafka主题。我能够从Oracle阅读,将其放入数据框中,然后在下面的代码中显示有关Kafka的所有参数,但是我遇到了错…
KAFKA的通知引擎发送推送通知从Android应用中的不同用户获取ID
我正在设计一个通知引擎(基于打字稿),该引擎将用作单一停车商店,用于其他各种系统,以将通知发送到不同的频道(推送通知,SMS,电子邮件等)。我…
kafka connect protobuf转换器找不到
我有一个Kafka S3-sink连接器,我正在生产消息。但是,当我将消息发送到主题时,我注意到我的水槽连接器的日志给我带来了一个错误: │ Invalid value…
如何将无界的水表发送到Kafka水槽?
我正在使用表API创建两个流,让我们称其为A和B。使用ExecutesQL我正在加入两个表。输出为表格。我想将连接的结果发送到Kafka水槽。请在下面找到代码。…
Kafka消费者如何强制刷新元数据以发现新主题
我正在使用Regex模式来订阅一组主题,这些主题可能是动态创建的。但是,消费者发现新创建的主题可能还有很长一段时间。 我可以设置 topic.metadata.re…
Google DataProc作业与本地Keytab / TicketCache文件一起提交
我正在尝试提交一个数据ploc作业,该作业将消费来自Kerberized Kafka群集的数据。 当前的工作解决方案是在计算机上具有JAAS配置文件和键盘,该文件正…
聚合&与Kafka流分组
我正在Kafka流和州商店工作。我有以下逻辑,可以根据密钥汇总消息组。 final Materialized> abcStore = Materialized.
如何从S3接收器中的信封类型架构中提取嵌套字段
AVRO架构: { "type": "record", "name": "Envelope", "namespace": "test", "fields": [ { "name": "before", "type": [ "null", { "type": "record"…
如何将偏移窗口应用到翻滚窗口,以延迟Windows< timeWindow>在Kafka流中
我正在使用不同的窗口尺寸在2022年5月的数据集上计算一个简单的均值。使用1小时的窗户没有问题,使用1周零1个月的窗口,记录未正确评估。 如所讨论的…
Apache Kafka消息被存档 - 是否可以检索消息
我们正在使用Apache Kafka,我们每天处理超过3000万条消息。我们的保留政策为“ 30”天。但是,在30天之前,我们的消息被存档了。 有什么方法可以检索…
春季云流kafka commitfailedexception
我使用了spring-cloud-starter-stream-kafka 3.0.4.Release,我有以下错误: org.apache.kafka.clients.consumer.CommitFailedException: Commit cann…
apache kafka经常连接并关闭nodejs中的连接
,我正在使用 kafka-node npm软件包使用kafka,并且我的自定义连接检查流 client.on("close", () => { console.log("kafka close"); }); client.on("c…