kinesis firehose 流中嵌套 json 的 json 加载失败
我正在尝试使用 kinesis firehose 将 cloudwatch 指标流式传输到 S3。我正在使用 Lambda python 函数来操作数据。我的主要问题是当我尝试 json.load 时,嵌套有效负载 json 失败。我尝试了多种方法来实现它但无法实现。粘贴在所有调试输出下方
import base64
import json
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')
payload_obj = json.loads(payload)
# Do custom processing on the payload here
payload_obj['event_timestamp'] = int(payload_obj['timestamp'] / 1000)
print(payload_obj)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(payload_obj, default=str).encode('utf-8')).decode("utf-8")
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
错误是:
[ERROR] JSONDecodeError: Extra data: line 2 column 1 (char 373)
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 13, in lambda_handler
payload_obj = json.loads(payload)
File "/var/lang/lib/python3.9/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/var/lang/lib/python3.9/json/decoder.py", line 340, in decode
raise JSONDecodeError("Extra data", s, end)
有效负载是:
每个有效载荷如下
记录 1:
{
"metric_stream_name": "timestamp-day-partition-parquet",
"account_id": "123456",
"region": "us-east-1",
"namespace": "AWS/RDS",
"metric_name": "ForwardingMasterOpenSessions",
"dimensions": {
"DBClusterIdentifier": "aurora-mysql-testbox",
"Role": "WRITER"
},
"timestamp": 1646884680000,
"value": {
"max": 0,
"min": 0,
"sum": 0,
"count": 1
},
"unit": "Count"
}
记录 2:
{
"metric_stream_name": "atlas-timestamp-day-partition-parquet",
"account_id": "123456",
"region": "us-east-1",
"namespace": "AWS/RDS",
"metric_name": "Aurora_pq_request_not_chosen_update_delete_stmts",
"dimensions": {
"DBInstanceIdentifier": "test-aurora-mysql-sandbox-reader-1"
},
"timestamp": 1646884680000,
"value": {
"max": 0,
"min": 0,
"sum": 0,
"count": 1
},
"unit": "Count"
}
I am trying to stream cloudwatch metric using kinesis firehose to S3. I am using Lambda python function to manipulate data. My major issue is the nested payload json is failing when I am trying json.load. I have tried multiple way to achieve it but not able to. Pasting below all the debug outputs
import base64
import json
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')
payload_obj = json.loads(payload)
# Do custom processing on the payload here
payload_obj['event_timestamp'] = int(payload_obj['timestamp'] / 1000)
print(payload_obj)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(payload_obj, default=str).encode('utf-8')).decode("utf-8")
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
Error is:
[ERROR] JSONDecodeError: Extra data: line 2 column 1 (char 373)
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 13, in lambda_handler
payload_obj = json.loads(payload)
File "/var/lang/lib/python3.9/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/var/lang/lib/python3.9/json/decoder.py", line 340, in decode
raise JSONDecodeError("Extra data", s, end)
Payload is:
Each payload is as below
Record 1:
{
"metric_stream_name": "timestamp-day-partition-parquet",
"account_id": "123456",
"region": "us-east-1",
"namespace": "AWS/RDS",
"metric_name": "ForwardingMasterOpenSessions",
"dimensions": {
"DBClusterIdentifier": "aurora-mysql-testbox",
"Role": "WRITER"
},
"timestamp": 1646884680000,
"value": {
"max": 0,
"min": 0,
"sum": 0,
"count": 1
},
"unit": "Count"
}
Record 2:
{
"metric_stream_name": "atlas-timestamp-day-partition-parquet",
"account_id": "123456",
"region": "us-east-1",
"namespace": "AWS/RDS",
"metric_name": "Aurora_pq_request_not_chosen_update_delete_stmts",
"dimensions": {
"DBInstanceIdentifier": "test-aurora-mysql-sandbox-reader-1"
},
"timestamp": 1646884680000,
"value": {
"max": 0,
"min": 0,
"sum": 0,
"count": 1
},
"unit": "Count"
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
错误
JSONDecodeError("Extra data", s, end)
表示负载中存在多个 JSON 对象,您已将其标记为记录 1 和记录 2。json.loads ()
无法解析多个 JSON 对象,因此会出现“额外数据”错误。有关更多详细信息,请参阅此 Stack Overflow 帖子:Python json.loads 显示 ValueError:额外数据 帖子中的答案
建议将 JSON 对象包装在数组中,然后使用 json.loads() 解析该数组。
The error
JSONDecodeError("Extra data", s, end)
indicates that there are multiple JSON objects in the payload, which you've already noted as Record 1 and Record 2.json.loads()
is not able to parse multiple JSON objects, hence the "extra data" error.See this Stack Overflow post for more details: Python json.loads shows ValueError: Extra data
An answer from the post suggests wrapping the JSON objects in an array, and then using
json.loads()
to parse the array.