只能对某些 xcontent 字节或压缩的 xcontent 字节调用压缩器检测

发布于 2025-01-13 00:56:56 字数 11626 浏览 3 评论 0原文

我们正在尝试设置 AWS 集中日志记录。

CloudWatch 日志 => Kinesis 数据流 =>带 Transform 的 Firehouse(Python Lambda 函数)=> OpenSearch(ES)

我在 Firehouse(目的地)中收到此错误

{ "deliveryStreamARN": "arn:aws:firehose:us-east-1:012313756720:deliverystream/KDS-OPS-EPS", "目的地": "arn:aws:es:us-east-1:012313756720:domain/aws-cent-qa", “deliveryStreamVersionId”:1, "message": "{"type":"mapper_parsing_exception","re​​ason":"失败 parse","caused_by":{"type":"not_x_content_exception","re​​ason":"压缩机 只能对某些 xcontent 字节或压缩的字节调用检测 xcontent 字节"}}", “错误代码”:“400”, “处理器”:“arn:aws:lambda:us-east-1:012313756720:功能:DELIIVETYSTLAMBDA:$最新” }

代码

import base64
import json
import gzip
from io import BytesIO
import boto3
from datetime import datetime


def transformLogEvent(log_event):
    """Transform each log event.

    The default implementation below just extracts the message and appends a newline to it.

    Args:
    log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}

    Returns:
    str: The transformed log event.
    """
    
    # Firstly, I set the index name.
    indexName = "cw-fh-es"
    
    # I can then apply a daily index rotation to the index, although this is optional. Comment the below if you want to have one, long-lived index.
    # Please note that you should only keep one of the below options, daily or monthly uncommented if using them.
    # Additionally, please ensure that the index rotation configuration set for the Firehose stream matches what you have set in the code.
    
    # The below applies a daily index rotation.
    indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:] + "-" + ("0" + str(datetime.now().day))[-2:]
    
    # The below applies a monthly index rotation.
    # indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:]
    
    # In the transformLogEvent function, I added the line '{ "index" : { "_index" : "<indexName>" } }' before each CloudWatch log, with a new line in between, and a new line after.
    
    # This then returns a result similar to the following once all CloudWatch logs are passed:

    # { "index" : { "_index" : "<indexName>" } }
    # { "field1" : "value1" }
    # { "index" : { "_index" : "<indexName>" } }
    # { "field1" : "value1" }
    # { "index" : { "_index" : "<indexName>" } }
    # { "field1" : "value1" }
    # { "index" : { "_index" : "<indexName>" } }
    # { "field1" : "value1" }
    
    return '{ "index" : { "_index" : "'+ indexName +'" } }' + '\n' + log_event['message'] + '\n'

# Goal of this adjustment to the function to construct a bulk request using the decompressed logs, and leaving space for the line { "index" : { "_index" : "test", "_id" : "1" } } that Firehose will add upon sending the bulk request to OpenSearch.
# This allows us to send multiple JSON objects in a single Firehose record to Opensearch.

def processRecords(records):
    for r in records:
        data = base64.b64decode(r['data'])
        striodata = BytesIO(data)
        with gzip.GzipFile(fileobj=striodata, mode='r') as f:
            data = json.loads(f.read())

        recId = r['recordId']
        """
        CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
        They do not contain actual data.
        """
        if data['messageType'] == 'CONTROL_MESSAGE':
            yield {
                'result': 'Dropped',
                'recordId': recId
            }
        elif data['messageType'] == 'DATA_MESSAGE':
            print("data['logEvents']: " + str(data['logEvents']))
            joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
            
            # In the processRecords function, after receiving the result from transformLogEvent, I want to remove the first index specification line as Firehose will add this when sending the bulk request to Elasticsearch.
            # I did this by first splitting the data into a list based on the new line.
            print("joinedData before split: " + joinedData)
            joinedDataList = joinedData.split('\n')
            
            # Then I delete the first item from this list, which would be the first entry of { "index" : { "_index" : "<indexName>" } } in the list.
            del(joinedDataList[0])
            
            # I then create an empty string, and add each remaining item from the list.
            print("joinedDataListLine START_LINE")
            
            joinedData = ""
            for line in joinedDataList:
                joinedData += line + '\n'
                print("joinedDataListLine " + line)
                
            print("joinedDataListLine END_LINE")
            
            # This would leave me with the following:

            # { "field1" : "value1" }
            # { "index" : { "_index" : "<indexName>" } }
            # { "field1" : "value1" }
            # { "index" : { "_index" : "<indexName>" } }
            # { "field1" : "value1" }
            # { "index" : { "_index" : "<indexName>" } }
            # { "field1" : "value1" }
            
            dataBytes = joinedData.encode("utf-8")
            encodedData = base64.b64encode(dataBytes)
            if len(encodedData) <= 6000000:
                yield {
                    'data': encodedData,
                    'result': 'Ok',
                    'recordId': recId
                }
            else:
                yield {
                    'result': 'ProcessingFailed',
                    'recordId': recId
                }
        else:
            yield {
                'result': 'ProcessingFailed',
                'recordId': recId
            }


def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_records(StreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedRecordCount'] > 0:
        for idx, res in enumerate(response['Records']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
            putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def createReingestionRecord(isSas, originalRecord):
    if isSas:
        return {'data': base64.b64decode(originalRecord['data']), 'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
    else:
        return {'data': base64.b64decode(originalRecord['data'])}


def getReingestionRecord(isSas, reIngestionRecord):
    if isSas:
        return {'Data': reIngestionRecord['data'], 'PartitionKey': reIngestionRecord['partitionKey']}
    else:
        return {'Data': reIngestionRecord['data']}


def lambda_handler(event, context):
    isSas = 'sourceKinesisStreamArn' in event
    streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
    region = streamARN.split(':')[3]
    streamName = streamARN.split('/')[1]
    records = list(processRecords(event['records']))
    projectedSize = 0
    dataByRecordId = {rec['recordId']: createReingestionRecord(isSas, rec) for rec in event['records']}
    putRecordBatches = []
    recordsToReingest = []
    totalRecordsToBeReingested = 0

    for idx, rec in enumerate(records):
        if rec['result'] != 'Ok':
            continue
        projectedSize += len(rec['data']) + len(rec['recordId'])
        # 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
        if projectedSize > 6000000:
            totalRecordsToBeReingested += 1
            recordsToReingest.append(
                getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
            )
            records[idx]['result'] = 'Dropped'
            del(records[idx]['data'])

        # split out the record batches into multiple groups, 500 records at max per group
        if len(recordsToReingest) == 500:
            putRecordBatches.append(recordsToReingest)
            recordsToReingest = []

    if len(recordsToReingest) > 0:
        # add the last batch
        putRecordBatches.append(recordsToReingest)

    # iterate and call putRecordBatch for each group
    recordsReingestedSoFar = 0
    if len(putRecordBatches) > 0:
        client = boto3.client('kinesis', region_name=region) if isSas else boto3.client('firehose', region_name=region)
        for recordBatch in putRecordBatches:
            if isSas:
                putRecordsToKinesisStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
            else:
                putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
            recordsReingestedSoFar += len(recordBatch)
            print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar, totalRecordsToBeReingested, len(event['records'])))
    else:
        print('No records to be reingested')

    return {"records": records}

We're attempting to set up AWS centralised logging.

CloudWatch Logs => Kinesis Data Stream => Firehouse with Transform (Python Lambda Function) => OpenSearch(ES)

Am getting this error in Firehouse (Destination)

{
"deliveryStreamARN": "arn:aws:firehose:us-east-1:012313756720:deliverystream/KDS-OPS-EPS",
"destination": "arn:aws:es:us-east-1:012313756720:domain/aws-cent-qa",
"deliveryStreamVersionId": 1,
"message": "{"type":"mapper_parsing_exception","reason":"failed to
parse","caused_by":{"type":"not_x_content_exception","reason":"Compressor
detection can only be called on some xcontent bytes or compressed
xcontent bytes"}}",
"errorCode": "400",
"processor": "arn:aws:lambda:us-east-1:012313756720:function:DELIIVETYSTLAMBDA:$LATEST"
}

Python Code

import base64
import json
import gzip
from io import BytesIO
import boto3
from datetime import datetime


def transformLogEvent(log_event):
    """Transform each log event.

    The default implementation below just extracts the message and appends a newline to it.

    Args:
    log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}

    Returns:
    str: The transformed log event.
    """
    
    # Firstly, I set the index name.
    indexName = "cw-fh-es"
    
    # I can then apply a daily index rotation to the index, although this is optional. Comment the below if you want to have one, long-lived index.
    # Please note that you should only keep one of the below options, daily or monthly uncommented if using them.
    # Additionally, please ensure that the index rotation configuration set for the Firehose stream matches what you have set in the code.
    
    # The below applies a daily index rotation.
    indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:] + "-" + ("0" + str(datetime.now().day))[-2:]
    
    # The below applies a monthly index rotation.
    # indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:]
    
    # In the transformLogEvent function, I added the line '{ "index" : { "_index" : "<indexName>" } }' before each CloudWatch log, with a new line in between, and a new line after.
    
    # This then returns a result similar to the following once all CloudWatch logs are passed:

    # { "index" : { "_index" : "<indexName>" } }
    # { "field1" : "value1" }
    # { "index" : { "_index" : "<indexName>" } }
    # { "field1" : "value1" }
    # { "index" : { "_index" : "<indexName>" } }
    # { "field1" : "value1" }
    # { "index" : { "_index" : "<indexName>" } }
    # { "field1" : "value1" }
    
    return '{ "index" : { "_index" : "'+ indexName +'" } }' + '\n' + log_event['message'] + '\n'

# Goal of this adjustment to the function to construct a bulk request using the decompressed logs, and leaving space for the line { "index" : { "_index" : "test", "_id" : "1" } } that Firehose will add upon sending the bulk request to OpenSearch.
# This allows us to send multiple JSON objects in a single Firehose record to Opensearch.

def processRecords(records):
    for r in records:
        data = base64.b64decode(r['data'])
        striodata = BytesIO(data)
        with gzip.GzipFile(fileobj=striodata, mode='r') as f:
            data = json.loads(f.read())

        recId = r['recordId']
        """
        CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
        They do not contain actual data.
        """
        if data['messageType'] == 'CONTROL_MESSAGE':
            yield {
                'result': 'Dropped',
                'recordId': recId
            }
        elif data['messageType'] == 'DATA_MESSAGE':
            print("data['logEvents']: " + str(data['logEvents']))
            joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
            
            # In the processRecords function, after receiving the result from transformLogEvent, I want to remove the first index specification line as Firehose will add this when sending the bulk request to Elasticsearch.
            # I did this by first splitting the data into a list based on the new line.
            print("joinedData before split: " + joinedData)
            joinedDataList = joinedData.split('\n')
            
            # Then I delete the first item from this list, which would be the first entry of { "index" : { "_index" : "<indexName>" } } in the list.
            del(joinedDataList[0])
            
            # I then create an empty string, and add each remaining item from the list.
            print("joinedDataListLine START_LINE")
            
            joinedData = ""
            for line in joinedDataList:
                joinedData += line + '\n'
                print("joinedDataListLine " + line)
                
            print("joinedDataListLine END_LINE")
            
            # This would leave me with the following:

            # { "field1" : "value1" }
            # { "index" : { "_index" : "<indexName>" } }
            # { "field1" : "value1" }
            # { "index" : { "_index" : "<indexName>" } }
            # { "field1" : "value1" }
            # { "index" : { "_index" : "<indexName>" } }
            # { "field1" : "value1" }
            
            dataBytes = joinedData.encode("utf-8")
            encodedData = base64.b64encode(dataBytes)
            if len(encodedData) <= 6000000:
                yield {
                    'data': encodedData,
                    'result': 'Ok',
                    'recordId': recId
                }
            else:
                yield {
                    'result': 'ProcessingFailed',
                    'recordId': recId
                }
        else:
            yield {
                'result': 'ProcessingFailed',
                'recordId': recId
            }


def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_records(StreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedRecordCount'] > 0:
        for idx, res in enumerate(response['Records']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
            putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def createReingestionRecord(isSas, originalRecord):
    if isSas:
        return {'data': base64.b64decode(originalRecord['data']), 'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
    else:
        return {'data': base64.b64decode(originalRecord['data'])}


def getReingestionRecord(isSas, reIngestionRecord):
    if isSas:
        return {'Data': reIngestionRecord['data'], 'PartitionKey': reIngestionRecord['partitionKey']}
    else:
        return {'Data': reIngestionRecord['data']}


def lambda_handler(event, context):
    isSas = 'sourceKinesisStreamArn' in event
    streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
    region = streamARN.split(':')[3]
    streamName = streamARN.split('/')[1]
    records = list(processRecords(event['records']))
    projectedSize = 0
    dataByRecordId = {rec['recordId']: createReingestionRecord(isSas, rec) for rec in event['records']}
    putRecordBatches = []
    recordsToReingest = []
    totalRecordsToBeReingested = 0

    for idx, rec in enumerate(records):
        if rec['result'] != 'Ok':
            continue
        projectedSize += len(rec['data']) + len(rec['recordId'])
        # 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
        if projectedSize > 6000000:
            totalRecordsToBeReingested += 1
            recordsToReingest.append(
                getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
            )
            records[idx]['result'] = 'Dropped'
            del(records[idx]['data'])

        # split out the record batches into multiple groups, 500 records at max per group
        if len(recordsToReingest) == 500:
            putRecordBatches.append(recordsToReingest)
            recordsToReingest = []

    if len(recordsToReingest) > 0:
        # add the last batch
        putRecordBatches.append(recordsToReingest)

    # iterate and call putRecordBatch for each group
    recordsReingestedSoFar = 0
    if len(putRecordBatches) > 0:
        client = boto3.client('kinesis', region_name=region) if isSas else boto3.client('firehose', region_name=region)
        for recordBatch in putRecordBatches:
            if isSas:
                putRecordsToKinesisStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
            else:
                putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
            recordsReingestedSoFar += len(recordBatch)
            print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar, totalRecordsToBeReingested, len(event['records'])))
    else:
        print('No records to be reingested')

    return {"records": records}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文