Flink Table API在流模式下加入表
我已经在我的Flink应用程序中注册了两个JDBC表,并希望加入它们并将结果转换为常规数据流。 但是,当我加入表格时,
在线程“ main” org.apache.flink.table.api.tableException中获得错误异常时由节点join(interype = [weledouterjoin],where = [((id = asset_id)和(lans _id = ally_id0))],select = [ally_id,id,id,poi_id,gateway_id,athewway_id,asset_id0,asset_id,asset_id,tag_id,cool] [nouniquekey],rightInputspec = [nouniquekey])
val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("id", DataTypes.STRING)
.column("poi_id", DataTypes.STRING)
.column("gateway_id", DataTypes.STRING)
.column("internal_status", DataTypes.STRING)
.build())
.build()
val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("asset_id", DataTypes.STRING)
.column("tag_id", DataTypes.STRING)
.column("role", DataTypes.STRING)
.build())
.build()
tableEnv.createTemporaryTable("asset", assetTableDescriptor)
tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)
val assetTable: Table = tableEnv.from(assetTableDescriptor)
.select($"owner_id" as "asset_owner_id", $"id", $"poi_id", $"gateway_id", $"internal_status")
val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
.select($"owner_id", $"asset_id", $"tag_id", $"role")
val assetAssociationTable = assetTable
.leftOuterJoin(assetTagTable, $"id" === $"asset_id" and $"asset_owner_id" === $"owner_id")
.select($"asset_owner_id", $"id", $"poi_id", $"gateway_id", $"tag_id", $"role")
val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
.toDataStream(assetAssociationTable, classOf[JdbcAssetState])
.flatMap(new JdbcAssetStateDataMapper)
加入assettableStream
在批处理模式下,它可以正常工作,但是我需要在我在流模式下以我发现的内容 在Flink Docs中,我似乎需要使用查找加入
,但无法弄清楚如何使用表API(不是SQL)来做到这一点。
加入两个JDBC表并将其转换为dataStream的任何小例子都很棒
I have registered two jdbc tables in a my flink application, and want to join them and convert result into regular datastream.
But when I join the tables getting error
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[((id = asset_id) AND (owner_id = owner_id0))], select=[owner_id, id, poi_id, gateway_id, owner_id0, asset_id, tag_id, role], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
Code
val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("id", DataTypes.STRING)
.column("poi_id", DataTypes.STRING)
.column("gateway_id", DataTypes.STRING)
.column("internal_status", DataTypes.STRING)
.build())
.build()
val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("asset_id", DataTypes.STRING)
.column("tag_id", DataTypes.STRING)
.column("role", DataTypes.STRING)
.build())
.build()
tableEnv.createTemporaryTable("asset", assetTableDescriptor)
tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)
val assetTable: Table = tableEnv.from(assetTableDescriptor)
.select(quot;owner_id" as "asset_owner_id", quot;id", quot;poi_id", quot;gateway_id", quot;internal_status")
val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
.select(quot;owner_id", quot;asset_id", quot;tag_id", quot;role")
val assetAssociationTable = assetTable
.leftOuterJoin(assetTagTable, quot;id" === quot;asset_id" and quot;asset_owner_id" === quot;owner_id")
.select(quot;asset_owner_id", quot;id", quot;poi_id", quot;gateway_id", quot;tag_id", quot;role")
val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
.toDataStream(assetAssociationTable, classOf[JdbcAssetState])
.flatMap(new JdbcAssetStateDataMapper)
In a BATCH mode, it works good, but I need to join assetTableStream
with another stream in my app in STREAMING mode
Based what I found in flink docs looks like I need to use Lookup Join
but cannot figure out how to do that with table API (not SQL).
Having any small example with joining two jdbc tables and converting it into datastream would be fantastic
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
两个提示:
toChangelogStream
方法(而不是示例中的toDataStream
),Two hints:
toChangelogStream
method (nottoDataStream
as it is in your example)