Flink Table API在流模式下加入表

发布于 2025-01-20 04:47:27 字数 2817 浏览 3 评论 0原文

我已经在我的Flink应用程序中注册了两个JDBC表,并希望加入它们并将结果转换为常规数据流。 但是,当我加入表格时,

在线程“ main” org.apache.flink.table.api.tableException中获得错误异常时由节点join(interype = [weledouterjoin],where = [((id = asset_id)和(lans _id = ally_id0))],select = [ally_id,id,id,poi_id,gateway_id,athewway_id,asset_id0,asset_id,asset_id,tag_id,cool] [nouniquekey],rightInputspec = [nouniquekey])

    val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
      .option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
      .option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
      .option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
      .option(JdbcConnectorOptions.TABLE_NAME, "asset")
      .schema(Schema.newBuilder()
        .column("owner_id", DataTypes.STRING)
        .column("id", DataTypes.STRING)
        .column("poi_id", DataTypes.STRING)
        .column("gateway_id", DataTypes.STRING)
        .column("internal_status", DataTypes.STRING)
        .build())
      .build()

    val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
      .option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
      .option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
      .option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
      .option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
      .schema(Schema.newBuilder()
        .column("owner_id", DataTypes.STRING)
        .column("asset_id", DataTypes.STRING)
        .column("tag_id", DataTypes.STRING)
        .column("role", DataTypes.STRING)
        .build())
      .build()

    tableEnv.createTemporaryTable("asset", assetTableDescriptor)
    tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)


    val assetTable: Table = tableEnv.from(assetTableDescriptor)
      .select($"owner_id" as "asset_owner_id", $"id", $"poi_id", $"gateway_id", $"internal_status")

    val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
      .select($"owner_id", $"asset_id", $"tag_id", $"role")

    val assetAssociationTable = assetTable
      .leftOuterJoin(assetTagTable, $"id" === $"asset_id" and $"asset_owner_id" === $"owner_id")
      .select($"asset_owner_id", $"id", $"poi_id", $"gateway_id", $"tag_id", $"role")

    val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
      .toDataStream(assetAssociationTable, classOf[JdbcAssetState])
      .flatMap(new JdbcAssetStateDataMapper)

加入assettableStream

在批处理模式下,它可以正常工作,但是我需要在我在流模式下以我发现的内容 在Flink Docs中,我似乎需要使用查找加入,但无法弄清楚如何使用表API(不是SQL)来做到这一点。

加入两个JDBC表并将其转换为dataStream的任何小例子都很棒

I have registered two jdbc tables in a my flink application, and want to join them and convert result into regular datastream.
But when I join the tables getting error

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[((id = asset_id) AND (owner_id = owner_id0))], select=[owner_id, id, poi_id, gateway_id, owner_id0, asset_id, tag_id, role], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])

Code

    val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
      .option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
      .option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
      .option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
      .option(JdbcConnectorOptions.TABLE_NAME, "asset")
      .schema(Schema.newBuilder()
        .column("owner_id", DataTypes.STRING)
        .column("id", DataTypes.STRING)
        .column("poi_id", DataTypes.STRING)
        .column("gateway_id", DataTypes.STRING)
        .column("internal_status", DataTypes.STRING)
        .build())
      .build()

    val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
      .option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
      .option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
      .option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
      .option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
      .schema(Schema.newBuilder()
        .column("owner_id", DataTypes.STRING)
        .column("asset_id", DataTypes.STRING)
        .column("tag_id", DataTypes.STRING)
        .column("role", DataTypes.STRING)
        .build())
      .build()

    tableEnv.createTemporaryTable("asset", assetTableDescriptor)
    tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)


    val assetTable: Table = tableEnv.from(assetTableDescriptor)
      .select(
quot;owner_id" as "asset_owner_id", 
quot;id", 
quot;poi_id", 
quot;gateway_id", 
quot;internal_status")

    val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
      .select(
quot;owner_id", 
quot;asset_id", 
quot;tag_id", 
quot;role")

    val assetAssociationTable = assetTable
      .leftOuterJoin(assetTagTable, 
quot;id" === 
quot;asset_id" and 
quot;asset_owner_id" === 
quot;owner_id")
      .select(
quot;asset_owner_id", 
quot;id", 
quot;poi_id", 
quot;gateway_id", 
quot;tag_id", 
quot;role")

    val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
      .toDataStream(assetAssociationTable, classOf[JdbcAssetState])
      .flatMap(new JdbcAssetStateDataMapper)

In a BATCH mode, it works good, but I need to join assetTableStream with another stream in my app in STREAMING mode

Based what I found in flink docs looks like I need to use Lookup Join but cannot figure out how to do that with table API (not SQL).

Having any small example with joining two jdbc tables and converting it into datastream would be fantastic

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

倥絔 2025-01-27 04:47:27

两个提示:

Two hints:

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文