kinesis firehose 流中嵌套 json 的 json 加载失败

发布于 2025-01-13 10:59:08 字数 2707 浏览 4 评论 0原文

我正在尝试使用 kinesis firehose 将 cloudwatch 指标流式传输到 S3。我正在使用 Lambda python 函数来操作数据。我的主要问题是当我尝试 json.load 时,嵌套有效负载 json 失败。我尝试了多种方法来实现它但无法实现。粘贴在所有调试输出下方

import base64
import json

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')
        payload_obj = json.loads(payload)
        # Do custom processing on the payload here
        payload_obj['event_timestamp'] = int(payload_obj['timestamp'] / 1000)
        print(payload_obj)
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(payload_obj, default=str).encode('utf-8')).decode("utf-8")
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

错误是:

[ERROR] JSONDecodeError: Extra data: line 2 column 1 (char 373)
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 13, in lambda_handler
    payload_obj = json.loads(payload)
  File "/var/lang/lib/python3.9/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/var/lang/lib/python3.9/json/decoder.py", line 340, in decode
    raise JSONDecodeError("Extra data", s, end)

有效负载是:

在此处输入图像描述

每个有效载荷如下

记录 1:

{
    "metric_stream_name": "timestamp-day-partition-parquet",
    "account_id": "123456",
    "region": "us-east-1",
    "namespace": "AWS/RDS",
    "metric_name": "ForwardingMasterOpenSessions",
    "dimensions": {
        "DBClusterIdentifier": "aurora-mysql-testbox",
        "Role": "WRITER"
    },
    "timestamp": 1646884680000,
    "value": {
        "max": 0,
        "min": 0,
        "sum": 0,
        "count": 1
    },
    "unit": "Count"
}

记录 2:

{
    "metric_stream_name": "atlas-timestamp-day-partition-parquet",
    "account_id": "123456",
    "region": "us-east-1",
    "namespace": "AWS/RDS",
    "metric_name": "Aurora_pq_request_not_chosen_update_delete_stmts",
    "dimensions": {
        "DBInstanceIdentifier": "test-aurora-mysql-sandbox-reader-1"
    },
    "timestamp": 1646884680000,
    "value": {
        "max": 0,
        "min": 0,
        "sum": 0,
        "count": 1
    },
    "unit": "Count"
}

I am trying to stream cloudwatch metric using kinesis firehose to S3. I am using Lambda python function to manipulate data. My major issue is the nested payload json is failing when I am trying json.load. I have tried multiple way to achieve it but not able to. Pasting below all the debug outputs

import base64
import json

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')
        payload_obj = json.loads(payload)
        # Do custom processing on the payload here
        payload_obj['event_timestamp'] = int(payload_obj['timestamp'] / 1000)
        print(payload_obj)
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(payload_obj, default=str).encode('utf-8')).decode("utf-8")
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

Error is:

[ERROR] JSONDecodeError: Extra data: line 2 column 1 (char 373)
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 13, in lambda_handler
    payload_obj = json.loads(payload)
  File "/var/lang/lib/python3.9/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/var/lang/lib/python3.9/json/decoder.py", line 340, in decode
    raise JSONDecodeError("Extra data", s, end)

Payload is:

enter image description here

Each payload is as below

Record 1:

{
    "metric_stream_name": "timestamp-day-partition-parquet",
    "account_id": "123456",
    "region": "us-east-1",
    "namespace": "AWS/RDS",
    "metric_name": "ForwardingMasterOpenSessions",
    "dimensions": {
        "DBClusterIdentifier": "aurora-mysql-testbox",
        "Role": "WRITER"
    },
    "timestamp": 1646884680000,
    "value": {
        "max": 0,
        "min": 0,
        "sum": 0,
        "count": 1
    },
    "unit": "Count"
}

Record 2:

{
    "metric_stream_name": "atlas-timestamp-day-partition-parquet",
    "account_id": "123456",
    "region": "us-east-1",
    "namespace": "AWS/RDS",
    "metric_name": "Aurora_pq_request_not_chosen_update_delete_stmts",
    "dimensions": {
        "DBInstanceIdentifier": "test-aurora-mysql-sandbox-reader-1"
    },
    "timestamp": 1646884680000,
    "value": {
        "max": 0,
        "min": 0,
        "sum": 0,
        "count": 1
    },
    "unit": "Count"
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

疯到世界奔溃 2025-01-20 10:59:08

错误 JSONDecodeError("Extra data", s, end) 表示负载中存在多个 JSON 对象,您已将其标记为记录 1 和记录 2。 json.loads () 无法解析多个 JSON 对象,因此会出现“额外数据”错误。

有关更多详细信息,请参阅此 Stack Overflow 帖子:Python json.loads 显示 ValueError:额外数据 帖子中的答案

建议将 JSON 对象包装在数组中,然后使用 json.loads() 解析该数组。

payload = base64.b64decode(record['data']).decode('utf-8')
payload_list = json.loads("[" + payload + "]")

The error JSONDecodeError("Extra data", s, end) indicates that there are multiple JSON objects in the payload, which you've already noted as Record 1 and Record 2. json.loads() is not able to parse multiple JSON objects, hence the "extra data" error.

See this Stack Overflow post for more details: Python json.loads shows ValueError: Extra data

An answer from the post suggests wrapping the JSON objects in an array, and then using json.loads() to parse the array.

payload = base64.b64decode(record['data']).decode('utf-8')
payload_list = json.loads("[" + payload + "]")
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文