Flink 1.15计划器装载机Arithemeticutils NoclassDeffoundError问题
当将Flink 1.15与新的Planner Loader模块一起使用提供的范围时,我将获得arithmeticutils类的NoclassDeffoundError,我明确地导入了它,但是它没有起…
暂停/恢复在迁移过程中
我正在使用 apache flink 来从给定的一组 kafka 主题传播到 elasticsearch cluster中。 我面临的问题是,有时 alasticsearch 群集发展,我必须(1)修…
将Flink Kafka与模式注册表集成
我们正在使用汇合平台进行Kafka部署。我们正在使用模式注册表来存储模式。是否可以将模式注册表与Flink集成?如何从Confluent平台读取AVRO格式的数据…
从kafka阅读时,请使用keyby vs reinterpretaskeyedstream()
我有一个简单的Flink流处理应用程序(Flink版本1.13)。 Flink应用程序读取了Kakfa,对记录进行了陈述的处理,然后将结果写回Kafka。 从Kafka主题阅读…
ParquetPrototowriter创建一个不可读的镶木木材文件
我的.proto文件包含一个地图类型字段。 Message Foo { ... ... map fooMap = 19; } 我正在消费来自Kafka源的消息,并试图将消息写入S3存储桶中的镶木q…
如何访问Flink Map中在外面声明的Java变量?
我正在Java创建列表。我想在Flink中的地图功能中分享列表。如何在跨链接过程中共享变量 要求 我有静态数据记录(少于1000个记录)。我想将这些记录与…
程序运行后的一段时间后,Flink释放任务管理器
提交Flink程序后,它将运行一段时间。之后,我 org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.util.FlinkException…
是否可以将不是密钥的列分组,并将汇总结果附加到Apache Flink/Beam中的原始记录
假设我的数据如下… Col1 Col2 Col3 Col4 A ABC 101 1 B ABC 102 1 C ABCD 101 1 D ABCD 101 1 E ABC 101 1 我想要Groupby col2 col2 col3 and sum(…
Apache Flink Kafka avro从主题中序列化流,具有多种事件类型(架构注册表中的主题为主题的主题编号))
我已经写了一个消费者来阅读主题并构建通用记录的数据流,但是该主题包含带有不同模式的消息,因为我们在架构注册表中使用该主题的主题recordNeveStra…
在Flink中配置每个插槽的核心使用情况
我有一个 3 机器的集群,每个机器都有 4 core。每台机器都有一个任务管理器。我知道可以通过 taskmanager.numberoftaskslots 来控制flink中的插槽数。…
flinkkafkaconsumer:由:org.apache.kafka.common.errors.TimeOutexception:请求计时发布
Getting "org.apache.kafka.common.errors.TimeoutException: The request timed out" in Flink kafka consumer. 2022-06-06 08:57:24,044 warn org…
java.lang.nosuchmethoderror:' org.apache.flink.metrics.metricgroup org.apache.flink.api.common.functions.runtimecontext.getmetricgroup()'
我是Flink的新手。我正在编写一个Flink应用程序(在Java中),该应用程序消耗了Kafka主题中的数据。我在本地计算机上执行此操作(Apache Kafka 2.13-3…
如何通过编程方式检查Flink任务是否是背压
使用Flink 1.11。我需要确定是否面对弯曲任务。使用WebUI,我们可以监视背压状态。如果特定任务面向背压,是否有任何方法可以检查Flink应用程序吗?…