在接收非重要类型的数据流(使用PYFLINK)的数据流时,如何在Flink SQL中指定表格?
Flink SQL 应用程序从 AWS Kinesis Data Stream 接收数据,其中接收到的消息采用 JSON 格式,架构以 JSON Schema 表示,并且包含一个不是原始对象的属性,例如:
{
"$id": "https://example.com/schemas/customer",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": {
"first_name": { "type": "string" },
"last_name": { "type": "string" },
"shipping_address": { "$ref": "/schemas/address" },
"billing_address": { "$ref": "/schemas/address" }
},
"required": ["first_name", "last_name", "shipping_address", "billing_address"],
"$defs": {
"address": {
"$id": "/schemas/address",
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"street_address": { "type": "string" },
"city": { "type": "string" },
"state": { "$ref": "#/definitions/state" }
},
"required": ["street_address", "city", "state"],
"definitions": {
"state": { "enum": ["CA", "NY", "... etc ..."] }
}
}
}
}
我可以在 < href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#user-defined-data-types" rel="nofollow noreferrer">文档 那:
目前不支持注册的结构化类型。因此,他们 无法存储在目录中或在 CREATE TABLE DDL 中引用。
因此,如果我无法使用 CREATE TABLE 来创建表示我的应用程序正在接收的数据流的输入表,我应该如何处理数据流?我什至可以使用 Flink SQL 吗?
注意:我需要用 Python 编写我的应用程序。
A Flink SQL application receives data from an AWS Kinesis Data Stream, where the received messages are in JSON and where the schema is expressed in JSON Schema and which contains a property which is not a primitive object, for example:
{
"$id": "https://example.com/schemas/customer",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": {
"first_name": { "type": "string" },
"last_name": { "type": "string" },
"shipping_address": { "$ref": "/schemas/address" },
"billing_address": { "$ref": "/schemas/address" }
},
"required": ["first_name", "last_name", "shipping_address", "billing_address"],
"$defs": {
"address": {
"$id": "/schemas/address",
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"street_address": { "type": "string" },
"city": { "type": "string" },
"state": { "$ref": "#/definitions/state" }
},
"required": ["street_address", "city", "state"],
"definitions": {
"state": { "enum": ["CA", "NY", "... etc ..."] }
}
}
}
}
I can see in the documentation that:
Currently, registered structured types are not supported. Thus, they
cannot be stored in a catalog or referenced in a CREATE TABLE DDL.
So if I cannot use CREATE TABLE in order to create an input table representing the stream of data my application is receiving, how should I handle the stream of data? Can I even use Flink SQL at all?
NOTE: I need to write my application in Python.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论