如何从 Kafka 触发器异步调用 AWS lambda

发布于 2025-01-13 17:34:20 字数 2553 浏览 0 评论 0原文

我正在尝试探索如何通过直接从 Apache Kafka 事件流异步调用(没有 Kafka-connect)来处理 lambda 中的未处理异常(比如 Lambda 超时)。

当我将 Kafka 触发器附加到 lambda 时,它似乎不会将事件发送到异步目标(OnFailure/OnSuccess 到 SNS/SQS/Lambda)。 由于它没有检测到这些目的地,我得出的结论是,与 S3 事件源(异步工作并能够对其进行测试)不同,lambda 是从 Kafka 触发器同步调用的。

这是我处理两种异常的想法,即 HandledExceptions(已知应用程序错误)和 UnHandledExceptions(比如 Lambda 超时,即任务在 5 秒后被终止)。

因为我的代码可以正确处理 HandledExceptions,它将被 Retry/DLQ 主题捕获。我试图了解如何通过将这些 UnhandledExceptions 发送到另一个 Lambda Destinations OnFailure 来捕获它们。

这是我的示例 Python Lambda 代码(假设超时设置为 5 秒,并且我尝试休眠 25 秒只是为了测试此流程)

def process_record(record):
headers = {}
key = {}
value = {}
try:
    key = json.loads(transform(record["key"]))
    value = json.loads(transform(record["value"]))
    headers = {k: "".join(map(chr, v))
               for list_item in record["headers"] for (k, v) in list_item.items()}

    time.sleep(15) # Intentional to just to check for Unhandled Exceptions such as Lambda timeout which was set to 5 seconds.

    # process it, everything is good then move on
    #blah blah blah code

    # otherwise assuming somewhere we got Known error hence throw it and send it retry and dlq.
    raise Exception("something went wrong")
except Exception as e:
    print(
        f"Event is failed with error, sending it to retry topic {str(e)}")

    dlq_sent = False
    if not headers:
        headers = {'process_count': 1, 'error_reason': str(e)}
    else:
        # Handle the next time failure by sending to either retry topic or dlq topic
        process_counter = int(headers["process_count"])+1
        headers["process_count"] = process_counter
        headers["error_reason"] = str(e)
        if(process_counter > 3):
            # send it to DLQ
            headers["process_count"] = process_counter-1
            dlq_sent = True
    # send it to retry/DLQ topic
    print(f"Modified Headers are : {headers}")
    headers = [(k, bytes(str(v), 'utf-8')) for k, v in headers.items()]
    target_topic = dlq_topic if dlq_sent else retry_topic
    producedRecord = producer.send(
        target_topic, value=value, key=str(key).encode('utf-8'), headers=headers)
    print(
        f'producer response to target topic {target_topic} is {producedRecord.get()}')


def handler(event, context):
    print('**********PROCESSING*********')
    print("Event received", event)
    x: dict = event["records"]
    for topicData in x.values():
        for record in topicData:
            process_record(record)

I am trying to explore how to handle Unhandled Exceptions(Say Lambda Timeouts) from lambda by invoking Asynchronously From an Apache Kafka Event Stream directly(without Kafka-connect).

When I attach a Kafka trigger to lambda, It appears to doesn't send the events to Asynchronous Destinations(OnFailure/OnSuccess to SNS/SQS/Lambda).
Since It doesn't detect these Destinations I came to the conclusion that lambda is getting invoked synchronously from Kafka Trigger unlike the S3 Event source(which works asynchronously and was able to Test it).

Here's my Idea of Handling the 2 kinds of Exceptions i.e HandledExceptions(Known application errors) and UnHandledExceptions(say Lambda Timeouts i.e Task killed after 5 seconds).

Since I have my code that works correctly for HandledExceptions which will get caught by Retry/DLQ topics. I am trying to understand how to catch these UnhandledExceptions by sending them to Another Lambda Destinations OnFailure.

Here's my sample Python Lambda code (Assume that Timeout is set to 5 Seconds and I'm trying to sleep for 25 seconds just to Test this flow)

def process_record(record):
headers = {}
key = {}
value = {}
try:
    key = json.loads(transform(record["key"]))
    value = json.loads(transform(record["value"]))
    headers = {k: "".join(map(chr, v))
               for list_item in record["headers"] for (k, v) in list_item.items()}

    time.sleep(15) # Intentional to just to check for Unhandled Exceptions such as Lambda timeout which was set to 5 seconds.

    # process it, everything is good then move on
    #blah blah blah code

    # otherwise assuming somewhere we got Known error hence throw it and send it retry and dlq.
    raise Exception("something went wrong")
except Exception as e:
    print(
        f"Event is failed with error, sending it to retry topic {str(e)}")

    dlq_sent = False
    if not headers:
        headers = {'process_count': 1, 'error_reason': str(e)}
    else:
        # Handle the next time failure by sending to either retry topic or dlq topic
        process_counter = int(headers["process_count"])+1
        headers["process_count"] = process_counter
        headers["error_reason"] = str(e)
        if(process_counter > 3):
            # send it to DLQ
            headers["process_count"] = process_counter-1
            dlq_sent = True
    # send it to retry/DLQ topic
    print(f"Modified Headers are : {headers}")
    headers = [(k, bytes(str(v), 'utf-8')) for k, v in headers.items()]
    target_topic = dlq_topic if dlq_sent else retry_topic
    producedRecord = producer.send(
        target_topic, value=value, key=str(key).encode('utf-8'), headers=headers)
    print(
        f'producer response to target topic {target_topic} is {producedRecord.get()}')


def handler(event, context):
    print('**********PROCESSING*********')
    print("Event received", event)
    x: dict = event["records"]
    for topicData in x.values():
        for record in topicData:
            process_record(record)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文