当JSON模式变化时,如何在Pyflink SQL中引用嵌套的JSON?

发布于 2025-01-20 07:31:20 字数 1870 浏览 4 评论 0原文

我希望使用Pyflink进行一系列事件,其中这些事件是从AWS EventBridge中获取的。此流中的事件共享许多公共字段,但是它们的详细信息字段根据source> source和/或详细信息字段。例如,这是来自EC2的示例事件:

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "detail": {
    "instance-id": " i-1234567890abcdef0",
    "state": "terminated"
  }
}

ID版本source等字段在事件类型上是一致的,但请注意,请注意流中的不同类型的事件与详细信息字段具有不同的形状,例如,代码构造事件可能是这样的:

  "detail":{
    "build-status": "SUCCEEDED",
    "project-name": "my-sample-project",
    "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
    "additional-information": {
      "artifact": {
        "md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
        "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
        "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
      }
     }
   }

我想创建一个看起来像以下的语句,以键入流通过详细信息类型以喂食不同的子表,其中每个子桌子的处理方式都不同。

INSERT INTO ec2_event_table SELECT * from input_table WHERE source = 'aws.ec2'
INSERT INTO codebuild_event_table SELECT * from input_table WHERE source = 'aws.codebuild'

如何定义input_table(包含多路复用事件的输入表)?我尝试过:

CREATE TABLE input_table (
            source VARCHAR,
            detail MAP
          )

但这给了我一个错误。我需要指定map的类型,例如map< varchar,varchar>,我无法使用map<>

如何使用Pyflink SQL来指代深嵌套的JSON?

我试图使用SQL和表API做的事情,还是需要使用DataStream API?我不想为每种不同的事件类型创建不同的输入流。

I have a stream of events I wish to process using PyFlink, where the events are taken from AWS EventBridge. The events in this stream share a number of common fields, but their detail field varies according to the value of the source and/or detail-type field. For example, here is an example event from EC2:

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "detail": {
    "instance-id": " i-1234567890abcdef0",
    "state": "terminated"
  }
}

The id, version, source etc fields are consistent across event types, but note that a different type of event in the stream would have a different shape to the detail field, e.g. a CodeBuild event might look like this:

  "detail":{
    "build-status": "SUCCEEDED",
    "project-name": "my-sample-project",
    "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
    "additional-information": {
      "artifact": {
        "md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
        "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
        "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
      }
     }
   }

I'd like to create a statement that looks like the below to key the stream by detail-type to feed different sub-tables, where each sub-table is processed differently.

INSERT INTO ec2_event_table SELECT * from input_table WHERE source = 'aws.ec2'
INSERT INTO codebuild_event_table SELECT * from input_table WHERE source = 'aws.codebuild'

How do I define input_table (the input table containing the multiplexed events)? I've tried:

CREATE TABLE input_table (
            source VARCHAR,
            detail MAP
          )

but this gives me an error. I need to specify the type of MAP, e.g. MAP<VARCHAR, VARCHAR>, I can't use MAP<>.

How do I refer to deeply nested JSON using PyFlink SQL?

Is what I am trying to do possible with SQL and the Table API, or do I need to use the DataStream API? I don't want to create a different input stream for each different event type.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

萝莉病 2025-01-27 07:31:20

详细信息可以声明为VARCHAR,然后Input_table可以定义如下:

CREATE TABLE input_table (
    version VARCHAR,
    id VARCHAR,
    detail-type VARCHAR,
    source VARCHAR,
    account VARCHAR,
    time VARCHAR,
    region VARCHAR,
    detail VARCHAR
) with (
    ...
)

此外,如果要处理列详细信息,则可以将其解析为python udf中的json,如下:

@udf(result_type=DataTypes.STRING())
def get_id(detail):
    detail_json = json.loads(detail)
    if 'build-id' in detail_json:
        return detail_json['build-id']
    else:
        return detail_json['instance-id']

The column detail could be declared as VARCHAR and then input_table could be defined as following:

CREATE TABLE input_table (
    version VARCHAR,
    id VARCHAR,
    detail-type VARCHAR,
    source VARCHAR,
    account VARCHAR,
    time VARCHAR,
    region VARCHAR,
    detail VARCHAR
) with (
    ...
)

Moreover, if you want to process the column detail, you could parse it into a json in a Python UDF as following:

@udf(result_type=DataTypes.STRING())
def get_id(detail):
    detail_json = json.loads(detail)
    if 'build-id' in detail_json:
        return detail_json['build-id']
    else:
        return detail_json['instance-id']
姜生凉生 2025-01-27 07:31:20

我一直在搜索Flink SQL中的类似功能(Nested-Json模式定义)。到目前为止,这些是我来自Flink 1.15.x的发现。

{
   "id":1,
   "name":"Temperature Sensor",
   "payload":{
      "data":{
         "metric":"Temperature",
         "value":23,
      },
      "location":"Berlin",
      "timestamp":"2018-12-10 13:45:00.000"
   }
}

示例

CREATE TABLE sensors (
      id INT,
      name STRING,
      payload STRING
      // Here we declare payload as STRING type since there is no JSON datatype in Apache Flink
    ) WITH (
    'connector' = 'kafka',
    ...
    // Rest of the Apache Kafka connection properties
    ...
);

​。

SELECT * 
FROM sensors 
WHERE JSON_EXISTS(payload, '$.data');
SELECT DISTINCT JSON_VALUE(payload, '$.location') AS `city`
FROM sensors 
WHERE JSON_EXISTS(payload, '$.data');

​如果得到支持,那会更好。

参考:

  1. 展望ApacheFlink®1.15.0

    中的新JSON SQL功能

  2. apache flink文档

I have been searching for a similar functionality (nested-JSON schema definition) in Flink SQL. So far, these are my findings from Flink 1.15.x.

Example JSON

{
   "id":1,
   "name":"Temperature Sensor",
   "payload":{
      "data":{
         "metric":"Temperature",
         "value":23,
      },
      "location":"Berlin",
      "timestamp":"2018-12-10 13:45:00.000"
   }
}

Flink 1.15.x SQL CREATE Statement:

CREATE TABLE sensors (
      id INT,
      name STRING,
      payload STRING
      // Here we declare payload as STRING type since there is no JSON datatype in Apache Flink
    ) WITH (
    'connector' = 'kafka',
    ...
    // Rest of the Apache Kafka connection properties
    ...
);

JSON Functions (Introduced in 1.15.x, released in April '22. The latest is 1.15.1, released July '22)

SELECT * 
FROM sensors 
WHERE JSON_EXISTS(payload, '$.data');
SELECT DISTINCT JSON_VALUE(payload, '$.location') AS `city`
FROM sensors 
WHERE JSON_EXISTS(payload, '$.data');

Unfortunately, I couldn't find a proper way of defining a schema for the nested JSON (in this case, the payload is blindly considered as a string). Would have been better, if that was supported.

Reference:

  1. Looking ahead to the new JSON SQL functions in Apache Flink® 1.15.0

  2. Apache Flink Document

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文