使用估值导致事件处理滞后的巨大检查点大小
我在Flink中有一个应用程序,该应用程序会重复数据删除多个流。 它可以在一个字符串字段上进行键,并使用值使用值进行删除。 在RichFilterFunction中…
如果我们通过SavePoint取消工作,Job被取消了,SavePoint是失败如何现在还原这项工作
我已经使用本机Kubernetes部署并与SavePoint一起在应用程序模式下部署了Flink作业(我正在使用REST API命令),但是如果它未能完成保存点,则可以为该…
Apache Flink-将流写入S3错误-NULL URI主机
我有一个flink数据管道,该管道将下载的日志文件从S3转换,并以Parquet文件格式写回到另一个S3存储桶中。我配置了S3键& flink-conf.yaml 中的Sec…
Flink检查点没有重播在SavePoint/Checkpoint期间正在进行的KAFKA事件
我想在Flink中精确测试端到端。我的工作是: kafka -source-> mapper1-> mapper -2 - > kafka-sink 我在mapper1中放了一个 thread.sleep…
Apache Flink -Java.lang.noclassdeffounderror:org/apache/flink/api/connector/connector/sink2/sink2/sink
我写了一条flink管道,该管道写入数据流以镶木式格式提交。我使用 sinkto 方法将输出写入文件。当应用程序启动时,我会得到以下例外。 java.lang.Runt…
根据Kafka -Apache Flink中的路径读取S3文件
我有一个管道,可以听到一个接收S3文件名的Kafka主题&小路。管道必须从S3读取文件,并进行一些转换&聚合。 我看到Flink具有直接读取S3文件…
如何从DataStream Scala+ apache flink
我正在从Confluent的Kafka主题中获得AVRO的回应,当我想应对响应时,我正面临问题。不了解我应该如何定义avro deserializer并在阅读时使用在kafka源中…
在Flink中连接多个表格时,结果不一致
我们有4个定义的CDC来源,我们需要将数据组合到一个结果表中。我们使用SQL API为每个源创建一个表:例如: "CREATE TABLE IF NOT EXISTS PAA31 (\n" +…
我在运行以下pyflink代码时会遇到此错误
这是使用Apache Flink(Pyflink)从KAFKA源中计算每个CH [X]平均值的代码 我想我已经导入了所有必要的库 ,并且在运行代码 from numpy import average…
Flink JDBC沉入多个模式
我正在使用Flink JDBC接收器将数据推入Postgres表中。数据必须存储在具有相同数据库连接的不同的模式中。 DataStream stream = env.fromSource(...); …
与我正在做的时间窗口相比,有没有更好的方法来加入两个键流的更新?
因此,我拥有的是一项Flink作业,该作业会从Kafka接收一条消息,并将创建两个然后通过作业发送的updateObject事件。为了简单起见,一个是增加事件,另…
flink SQL流媒体查询的状态大小和检查点大小与TVF聚合保持增长
我们正在使用Flink SQL来定义流处理管道,该管道在5分钟的固定窗口上计算总和聚集。查询看起来像这样: INSERT INTO BigTableTable SELECT CONCAT_WS(…
使用Zookeeper HA的Flink群集总是关闭:[接收信号15:Sigterm]
环境: Flink1.14.4 Kubernetes中的独立应用模式 根据官方步骤中的 : flink群集: https://nightlies.apache.org/flink/flink/flink/flink/flink-doc…
从Python中的Tweets Kafka生产商创建Pyflink DataStream消费者
我想在Pyflink中创建我流kafka消费者,该kafka消费者可以在DeSerialization(JSON)后读取推文数据, 我有PYFLINK版本1.14.4(最后一个版本) 我可以…