“无法执行应用程序”适用于从 Kinesis Analytics Studio 部署的流应用程序
我有一个Kinesis Analytics Studio笔记本电脑运行良好,并且我正在尝试通过遵循步骤在这里(控制台的步骤),但我遇到了一些问题。该笔记本包含一个需…
Flink Table API在流模式下加入表
我已经在我的Flink应用程序中注册了两个JDBC表,并希望加入它们并将结果转换为常规数据流。 但是,当我加入表格时, 在线程“ main” org.apache.flin…
如何使用 KeyedProcessFunction 实现在 apache flink 中进行多个窗口聚合?
我想扩展下面的窗口聚合以计算更高的窗口聚集。 我的下部窗口聚合使用了keyedProcessfunction,并且实现了OnTimer,以便在窗口末端将数据冲入水槽中。…
对 WordCount 输出进行排序 Flink
我正在尝试学习 Flink,并且正在做基本的 WordCount 教程。我想知道如何对数据流的输出进行排序,以便它按降序输出计数。我不需要将其保存为文本文件…
Flink通过表DSL创建表
为了创建表,我使用 SQL 语法,例如 val tableEnv = StreamTableEnvironment.create(env, settings) tableEnv.executeSql( "CREATE TABLE asset (smth…
Flink -Postgres CDC Connnector-自定义查询
我正在开发 Flink 应用程序,以 Postgres DB 作为源来读取某些配置数据,将其转换为数据流,然后将其与传入的实时数据流连接起来。 我尝试过使用 Post…
在接收非重要类型的数据流(使用PYFLINK)的数据流时,如何在Flink SQL中指定表格?
Flink SQL 应用程序从 AWS Kinesis Data Stream 接收数据,其中接收到的消息采用 JSON 格式,架构以 JSON Schema 表示,并且包含一个不是原始对象的属…
从Azure Blob存储中读取连续的镶木材料文件作为dataStream
我想读取具有格式 yyyy-mm-dd/hh/mm/file_name.parquet 的目录中每秒在Azure Blob存储中生成的parquet文件。 我想将数据读为datastream。 我们是否已…
aws emr cluster flink不运行任何罐子,而是给出java.lang.nosuchmethoderror
我尝试使用以下命令提交一个 jar 作为 AWS EMR 中的一个步骤: aws emr add-steps --cluster-id j-XXXXXXXXX --steps Type=CUSTOM_JAR,Name=test-job,…
Flink KeyedCoProcessFunction 处理状态
我使用 keyedCoprocessfunction 函数用数据丰富了主数据流,来自另一个流 代码: class AssetDataEnrichment extends KeyedCoProcessFunction[String,…
FLINK SQL:row.getFieldsAs 返回 LocalDateTime 而不是时间戳?
Flink:1.13.2 我有一个 StreamTableEnvironment tableEnv 从 KafkaSource 读取流数据。 从这个 tableEnv 中,我过滤数据并将其转换回 DataStream。 D…
Flink-如何在flink sinkfunction中查找java包下的所有类
如何在flink sinkfunction中查找java包下的所有类?在 SinkFunction 中运行这些代码时我什么也没得到。请参阅以下代码 public Set findAllClassesUsin…
从数据库查询结果创建 Flink DataStream
在我的问题中,我需要查询数据库并将查询结果与 Flink 中的 Kafka 数据流连接起来。目前,这是通过将查询结果存储在文件中,然后使用 Flink 的 readFi…