在接收非重要类型的数据流(使用PYFLINK)的数据流时,如何在Flink SQL中指定表格?

发布于 2025-01-19 06:00:52 字数 1398 浏览 5 评论 0原文

Flink SQL 应用程序从 AWS Kinesis Data Stream 接收数据,其中接收到的消息采用 JSON 格式,架构以 JSON Schema 表示,并且包含一个不是原始对象的属性,例如:

{
  "$id": "https://example.com/schemas/customer",
  "$schema": "https://json-schema.org/draft/2020-12/schema",

  "type": "object",
  "properties": {
    "first_name": { "type": "string" },
    "last_name": { "type": "string" },
    "shipping_address": { "$ref": "/schemas/address" },
    "billing_address": { "$ref": "/schemas/address" }
  },
  "required": ["first_name", "last_name", "shipping_address", "billing_address"],

  "$defs": {
    "address": {
      "$id": "/schemas/address",
      "$schema": "http://json-schema.org/draft-07/schema#",

      "type": "object",
      "properties": {
        "street_address": { "type": "string" },
        "city": { "type": "string" },
        "state": { "$ref": "#/definitions/state" }
      },
      "required": ["street_address", "city", "state"],

      "definitions": {
        "state": { "enum": ["CA", "NY", "... etc ..."] }
      }
    }
  }
}

我可以在 < href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#user-defined-data-types" rel="nofollow noreferrer">文档 那:

目前不支持注册的结构化类型。因此,他们 无法存储在目录中或在 CREATE TABLE DDL 中引用。

因此,如果我无法使用 CREATE TABLE 来创建表示我的应用程序正在接收的数据流的输入表,我应该如何处理数据流?我什至可以使用 Flink SQL 吗?

注意:我需要用 Python 编写我的应用程序。

A Flink SQL application receives data from an AWS Kinesis Data Stream, where the received messages are in JSON and where the schema is expressed in JSON Schema and which contains a property which is not a primitive object, for example:

{
  "$id": "https://example.com/schemas/customer",
  "$schema": "https://json-schema.org/draft/2020-12/schema",

  "type": "object",
  "properties": {
    "first_name": { "type": "string" },
    "last_name": { "type": "string" },
    "shipping_address": { "$ref": "/schemas/address" },
    "billing_address": { "$ref": "/schemas/address" }
  },
  "required": ["first_name", "last_name", "shipping_address", "billing_address"],

  "$defs": {
    "address": {
      "$id": "/schemas/address",
      "$schema": "http://json-schema.org/draft-07/schema#",

      "type": "object",
      "properties": {
        "street_address": { "type": "string" },
        "city": { "type": "string" },
        "state": { "$ref": "#/definitions/state" }
      },
      "required": ["street_address", "city", "state"],

      "definitions": {
        "state": { "enum": ["CA", "NY", "... etc ..."] }
      }
    }
  }
}

I can see in the documentation that:

Currently, registered structured types are not supported. Thus, they
cannot be stored in a catalog or referenced in a CREATE TABLE DDL.

So if I cannot use CREATE TABLE in order to create an input table representing the stream of data my application is receiving, how should I handle the stream of data? Can I even use Flink SQL at all?

NOTE: I need to write my application in Python.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文