只能对某些 xcontent 字节或压缩的 xcontent 字节调用压缩器检测
我们正在尝试设置 AWS 集中日志记录。
CloudWatch 日志 => Kinesis 数据流 =>带 Transform 的 Firehouse(Python Lambda 函数)=> OpenSearch(ES)
我在 Firehouse(目的地)中收到此错误
{ "deliveryStreamARN": "arn:aws:firehose:us-east-1:012313756720:deliverystream/KDS-OPS-EPS", "目的地": "arn:aws:es:us-east-1:012313756720:domain/aws-cent-qa", “deliveryStreamVersionId”:1, "message": "{"type":"mapper_parsing_exception","reason":"失败 parse","caused_by":{"type":"not_x_content_exception","reason":"压缩机 只能对某些 xcontent 字节或压缩的字节调用检测 xcontent 字节"}}", “错误代码”:“400”, “处理器”:“arn:aws:lambda:us-east-1:012313756720:功能:DELIIVETYSTLAMBDA:$最新” }
代码
import base64
import json
import gzip
from io import BytesIO
import boto3
from datetime import datetime
def transformLogEvent(log_event):
"""Transform each log event.
The default implementation below just extracts the message and appends a newline to it.
Args:
log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}
Returns:
str: The transformed log event.
"""
# Firstly, I set the index name.
indexName = "cw-fh-es"
# I can then apply a daily index rotation to the index, although this is optional. Comment the below if you want to have one, long-lived index.
# Please note that you should only keep one of the below options, daily or monthly uncommented if using them.
# Additionally, please ensure that the index rotation configuration set for the Firehose stream matches what you have set in the code.
# The below applies a daily index rotation.
indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:] + "-" + ("0" + str(datetime.now().day))[-2:]
# The below applies a monthly index rotation.
# indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:]
# In the transformLogEvent function, I added the line '{ "index" : { "_index" : "<indexName>" } }' before each CloudWatch log, with a new line in between, and a new line after.
# This then returns a result similar to the following once all CloudWatch logs are passed:
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
return '{ "index" : { "_index" : "'+ indexName +'" } }' + '\n' + log_event['message'] + '\n'
# Goal of this adjustment to the function to construct a bulk request using the decompressed logs, and leaving space for the line { "index" : { "_index" : "test", "_id" : "1" } } that Firehose will add upon sending the bulk request to OpenSearch.
# This allows us to send multiple JSON objects in a single Firehose record to Opensearch.
def processRecords(records):
for r in records:
data = base64.b64decode(r['data'])
striodata = BytesIO(data)
with gzip.GzipFile(fileobj=striodata, mode='r') as f:
data = json.loads(f.read())
recId = r['recordId']
"""
CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
They do not contain actual data.
"""
if data['messageType'] == 'CONTROL_MESSAGE':
yield {
'result': 'Dropped',
'recordId': recId
}
elif data['messageType'] == 'DATA_MESSAGE':
print("data['logEvents']: " + str(data['logEvents']))
joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
# In the processRecords function, after receiving the result from transformLogEvent, I want to remove the first index specification line as Firehose will add this when sending the bulk request to Elasticsearch.
# I did this by first splitting the data into a list based on the new line.
print("joinedData before split: " + joinedData)
joinedDataList = joinedData.split('\n')
# Then I delete the first item from this list, which would be the first entry of { "index" : { "_index" : "<indexName>" } } in the list.
del(joinedDataList[0])
# I then create an empty string, and add each remaining item from the list.
print("joinedDataListLine START_LINE")
joinedData = ""
for line in joinedDataList:
joinedData += line + '\n'
print("joinedDataListLine " + line)
print("joinedDataListLine END_LINE")
# This would leave me with the following:
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
dataBytes = joinedData.encode("utf-8")
encodedData = base64.b64encode(dataBytes)
if len(encodedData) <= 6000000:
yield {
'data': encodedData,
'result': 'Ok',
'recordId': recId
}
else:
yield {
'result': 'ProcessingFailed',
'recordId': recId
}
else:
yield {
'result': 'ProcessingFailed',
'recordId': recId
}
def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
failedRecords = []
codes = []
errMsg = ''
# if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
except Exception as e:
failedRecords = records
errMsg = str(e)
# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
if not failedRecords and response and response['FailedPutCount'] > 0:
for idx, res in enumerate(response['RequestResponses']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if 'ErrorCode' not in res or not res['ErrorCode']:
continue
codes.append(res['ErrorCode'])
failedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)
if len(failedRecords) > 0:
if attemptsMade + 1 < maxAttempts:
print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))
def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
failedRecords = []
codes = []
errMsg = ''
# if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_records(StreamName=streamName, Records=records)
except Exception as e:
failedRecords = records
errMsg = str(e)
# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
if not failedRecords and response and response['FailedRecordCount'] > 0:
for idx, res in enumerate(response['Records']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if 'ErrorCode' not in res or not res['ErrorCode']:
continue
codes.append(res['ErrorCode'])
failedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)
if len(failedRecords) > 0:
if attemptsMade + 1 < maxAttempts:
print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))
def createReingestionRecord(isSas, originalRecord):
if isSas:
return {'data': base64.b64decode(originalRecord['data']), 'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
else:
return {'data': base64.b64decode(originalRecord['data'])}
def getReingestionRecord(isSas, reIngestionRecord):
if isSas:
return {'Data': reIngestionRecord['data'], 'PartitionKey': reIngestionRecord['partitionKey']}
else:
return {'Data': reIngestionRecord['data']}
def lambda_handler(event, context):
isSas = 'sourceKinesisStreamArn' in event
streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
region = streamARN.split(':')[3]
streamName = streamARN.split('/')[1]
records = list(processRecords(event['records']))
projectedSize = 0
dataByRecordId = {rec['recordId']: createReingestionRecord(isSas, rec) for rec in event['records']}
putRecordBatches = []
recordsToReingest = []
totalRecordsToBeReingested = 0
for idx, rec in enumerate(records):
if rec['result'] != 'Ok':
continue
projectedSize += len(rec['data']) + len(rec['recordId'])
# 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
if projectedSize > 6000000:
totalRecordsToBeReingested += 1
recordsToReingest.append(
getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
)
records[idx]['result'] = 'Dropped'
del(records[idx]['data'])
# split out the record batches into multiple groups, 500 records at max per group
if len(recordsToReingest) == 500:
putRecordBatches.append(recordsToReingest)
recordsToReingest = []
if len(recordsToReingest) > 0:
# add the last batch
putRecordBatches.append(recordsToReingest)
# iterate and call putRecordBatch for each group
recordsReingestedSoFar = 0
if len(putRecordBatches) > 0:
client = boto3.client('kinesis', region_name=region) if isSas else boto3.client('firehose', region_name=region)
for recordBatch in putRecordBatches:
if isSas:
putRecordsToKinesisStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
else:
putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
recordsReingestedSoFar += len(recordBatch)
print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar, totalRecordsToBeReingested, len(event['records'])))
else:
print('No records to be reingested')
return {"records": records}
We're attempting to set up AWS centralised logging.
CloudWatch Logs => Kinesis Data Stream => Firehouse with Transform (Python Lambda Function) => OpenSearch(ES)
Am getting this error in Firehouse (Destination)
{
"deliveryStreamARN": "arn:aws:firehose:us-east-1:012313756720:deliverystream/KDS-OPS-EPS",
"destination": "arn:aws:es:us-east-1:012313756720:domain/aws-cent-qa",
"deliveryStreamVersionId": 1,
"message": "{"type":"mapper_parsing_exception","reason":"failed to
parse","caused_by":{"type":"not_x_content_exception","reason":"Compressor
detection can only be called on some xcontent bytes or compressed
xcontent bytes"}}",
"errorCode": "400",
"processor": "arn:aws:lambda:us-east-1:012313756720:function:DELIIVETYSTLAMBDA:$LATEST"
}
Python Code
import base64
import json
import gzip
from io import BytesIO
import boto3
from datetime import datetime
def transformLogEvent(log_event):
"""Transform each log event.
The default implementation below just extracts the message and appends a newline to it.
Args:
log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}
Returns:
str: The transformed log event.
"""
# Firstly, I set the index name.
indexName = "cw-fh-es"
# I can then apply a daily index rotation to the index, although this is optional. Comment the below if you want to have one, long-lived index.
# Please note that you should only keep one of the below options, daily or monthly uncommented if using them.
# Additionally, please ensure that the index rotation configuration set for the Firehose stream matches what you have set in the code.
# The below applies a daily index rotation.
indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:] + "-" + ("0" + str(datetime.now().day))[-2:]
# The below applies a monthly index rotation.
# indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:]
# In the transformLogEvent function, I added the line '{ "index" : { "_index" : "<indexName>" } }' before each CloudWatch log, with a new line in between, and a new line after.
# This then returns a result similar to the following once all CloudWatch logs are passed:
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
return '{ "index" : { "_index" : "'+ indexName +'" } }' + '\n' + log_event['message'] + '\n'
# Goal of this adjustment to the function to construct a bulk request using the decompressed logs, and leaving space for the line { "index" : { "_index" : "test", "_id" : "1" } } that Firehose will add upon sending the bulk request to OpenSearch.
# This allows us to send multiple JSON objects in a single Firehose record to Opensearch.
def processRecords(records):
for r in records:
data = base64.b64decode(r['data'])
striodata = BytesIO(data)
with gzip.GzipFile(fileobj=striodata, mode='r') as f:
data = json.loads(f.read())
recId = r['recordId']
"""
CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
They do not contain actual data.
"""
if data['messageType'] == 'CONTROL_MESSAGE':
yield {
'result': 'Dropped',
'recordId': recId
}
elif data['messageType'] == 'DATA_MESSAGE':
print("data['logEvents']: " + str(data['logEvents']))
joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
# In the processRecords function, after receiving the result from transformLogEvent, I want to remove the first index specification line as Firehose will add this when sending the bulk request to Elasticsearch.
# I did this by first splitting the data into a list based on the new line.
print("joinedData before split: " + joinedData)
joinedDataList = joinedData.split('\n')
# Then I delete the first item from this list, which would be the first entry of { "index" : { "_index" : "<indexName>" } } in the list.
del(joinedDataList[0])
# I then create an empty string, and add each remaining item from the list.
print("joinedDataListLine START_LINE")
joinedData = ""
for line in joinedDataList:
joinedData += line + '\n'
print("joinedDataListLine " + line)
print("joinedDataListLine END_LINE")
# This would leave me with the following:
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
# { "index" : { "_index" : "<indexName>" } }
# { "field1" : "value1" }
dataBytes = joinedData.encode("utf-8")
encodedData = base64.b64encode(dataBytes)
if len(encodedData) <= 6000000:
yield {
'data': encodedData,
'result': 'Ok',
'recordId': recId
}
else:
yield {
'result': 'ProcessingFailed',
'recordId': recId
}
else:
yield {
'result': 'ProcessingFailed',
'recordId': recId
}
def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
failedRecords = []
codes = []
errMsg = ''
# if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
except Exception as e:
failedRecords = records
errMsg = str(e)
# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
if not failedRecords and response and response['FailedPutCount'] > 0:
for idx, res in enumerate(response['RequestResponses']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if 'ErrorCode' not in res or not res['ErrorCode']:
continue
codes.append(res['ErrorCode'])
failedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)
if len(failedRecords) > 0:
if attemptsMade + 1 < maxAttempts:
print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))
def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
failedRecords = []
codes = []
errMsg = ''
# if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_records(StreamName=streamName, Records=records)
except Exception as e:
failedRecords = records
errMsg = str(e)
# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
if not failedRecords and response and response['FailedRecordCount'] > 0:
for idx, res in enumerate(response['Records']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if 'ErrorCode' not in res or not res['ErrorCode']:
continue
codes.append(res['ErrorCode'])
failedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)
if len(failedRecords) > 0:
if attemptsMade + 1 < maxAttempts:
print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))
def createReingestionRecord(isSas, originalRecord):
if isSas:
return {'data': base64.b64decode(originalRecord['data']), 'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
else:
return {'data': base64.b64decode(originalRecord['data'])}
def getReingestionRecord(isSas, reIngestionRecord):
if isSas:
return {'Data': reIngestionRecord['data'], 'PartitionKey': reIngestionRecord['partitionKey']}
else:
return {'Data': reIngestionRecord['data']}
def lambda_handler(event, context):
isSas = 'sourceKinesisStreamArn' in event
streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
region = streamARN.split(':')[3]
streamName = streamARN.split('/')[1]
records = list(processRecords(event['records']))
projectedSize = 0
dataByRecordId = {rec['recordId']: createReingestionRecord(isSas, rec) for rec in event['records']}
putRecordBatches = []
recordsToReingest = []
totalRecordsToBeReingested = 0
for idx, rec in enumerate(records):
if rec['result'] != 'Ok':
continue
projectedSize += len(rec['data']) + len(rec['recordId'])
# 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
if projectedSize > 6000000:
totalRecordsToBeReingested += 1
recordsToReingest.append(
getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
)
records[idx]['result'] = 'Dropped'
del(records[idx]['data'])
# split out the record batches into multiple groups, 500 records at max per group
if len(recordsToReingest) == 500:
putRecordBatches.append(recordsToReingest)
recordsToReingest = []
if len(recordsToReingest) > 0:
# add the last batch
putRecordBatches.append(recordsToReingest)
# iterate and call putRecordBatch for each group
recordsReingestedSoFar = 0
if len(putRecordBatches) > 0:
client = boto3.client('kinesis', region_name=region) if isSas else boto3.client('firehose', region_name=region)
for recordBatch in putRecordBatches:
if isSas:
putRecordsToKinesisStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
else:
putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
recordsReingestedSoFar += len(recordBatch)
print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar, totalRecordsToBeReingested, len(event['records'])))
else:
print('No records to be reingested')
return {"records": records}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论