如何从 Kafka 触发器异步调用 AWS lambda
我正在尝试探索如何通过直接从 Apache Kafka 事件流异步调用(没有 Kafka-connect)来处理 lambda 中的未处理异常(比如 Lambda 超时)。
当我将 Kafka 触发器附加到 lambda 时,它似乎不会将事件发送到异步目标(OnFailure/OnSuccess 到 SNS/SQS/Lambda)。 由于它没有检测到这些目的地,我得出的结论是,与 S3 事件源(异步工作并能够对其进行测试)不同,lambda 是从 Kafka 触发器同步调用的。
这是我处理两种异常的想法,即 HandledExceptions(已知应用程序错误)和 UnHandledExceptions(比如 Lambda 超时,即任务在 5 秒后被终止)。
因为我的代码可以正确处理 HandledExceptions,它将被 Retry/DLQ 主题捕获。我试图了解如何通过将这些 UnhandledExceptions 发送到另一个 Lambda Destinations OnFailure 来捕获它们。
这是我的示例 Python Lambda 代码(假设超时设置为 5 秒,并且我尝试休眠 25 秒只是为了测试此流程)
def process_record(record):
headers = {}
key = {}
value = {}
try:
key = json.loads(transform(record["key"]))
value = json.loads(transform(record["value"]))
headers = {k: "".join(map(chr, v))
for list_item in record["headers"] for (k, v) in list_item.items()}
time.sleep(15) # Intentional to just to check for Unhandled Exceptions such as Lambda timeout which was set to 5 seconds.
# process it, everything is good then move on
#blah blah blah code
# otherwise assuming somewhere we got Known error hence throw it and send it retry and dlq.
raise Exception("something went wrong")
except Exception as e:
print(
f"Event is failed with error, sending it to retry topic {str(e)}")
dlq_sent = False
if not headers:
headers = {'process_count': 1, 'error_reason': str(e)}
else:
# Handle the next time failure by sending to either retry topic or dlq topic
process_counter = int(headers["process_count"])+1
headers["process_count"] = process_counter
headers["error_reason"] = str(e)
if(process_counter > 3):
# send it to DLQ
headers["process_count"] = process_counter-1
dlq_sent = True
# send it to retry/DLQ topic
print(f"Modified Headers are : {headers}")
headers = [(k, bytes(str(v), 'utf-8')) for k, v in headers.items()]
target_topic = dlq_topic if dlq_sent else retry_topic
producedRecord = producer.send(
target_topic, value=value, key=str(key).encode('utf-8'), headers=headers)
print(
f'producer response to target topic {target_topic} is {producedRecord.get()}')
def handler(event, context):
print('**********PROCESSING*********')
print("Event received", event)
x: dict = event["records"]
for topicData in x.values():
for record in topicData:
process_record(record)
I am trying to explore how to handle Unhandled Exceptions(Say Lambda Timeouts) from lambda by invoking Asynchronously From an Apache Kafka Event Stream directly(without Kafka-connect).
When I attach a Kafka trigger to lambda, It appears to doesn't send the events to Asynchronous Destinations(OnFailure/OnSuccess to SNS/SQS/Lambda).
Since It doesn't detect these Destinations I came to the conclusion that lambda is getting invoked synchronously from Kafka Trigger unlike the S3 Event source(which works asynchronously and was able to Test it).
Here's my Idea of Handling the 2 kinds of Exceptions i.e HandledExceptions(Known application errors) and UnHandledExceptions(say Lambda Timeouts i.e Task killed after 5 seconds).
Since I have my code that works correctly for HandledExceptions which will get caught by Retry/DLQ topics. I am trying to understand how to catch these UnhandledExceptions by sending them to Another Lambda Destinations OnFailure.
Here's my sample Python Lambda code (Assume that Timeout is set to 5 Seconds and I'm trying to sleep for 25 seconds just to Test this flow)
def process_record(record):
headers = {}
key = {}
value = {}
try:
key = json.loads(transform(record["key"]))
value = json.loads(transform(record["value"]))
headers = {k: "".join(map(chr, v))
for list_item in record["headers"] for (k, v) in list_item.items()}
time.sleep(15) # Intentional to just to check for Unhandled Exceptions such as Lambda timeout which was set to 5 seconds.
# process it, everything is good then move on
#blah blah blah code
# otherwise assuming somewhere we got Known error hence throw it and send it retry and dlq.
raise Exception("something went wrong")
except Exception as e:
print(
f"Event is failed with error, sending it to retry topic {str(e)}")
dlq_sent = False
if not headers:
headers = {'process_count': 1, 'error_reason': str(e)}
else:
# Handle the next time failure by sending to either retry topic or dlq topic
process_counter = int(headers["process_count"])+1
headers["process_count"] = process_counter
headers["error_reason"] = str(e)
if(process_counter > 3):
# send it to DLQ
headers["process_count"] = process_counter-1
dlq_sent = True
# send it to retry/DLQ topic
print(f"Modified Headers are : {headers}")
headers = [(k, bytes(str(v), 'utf-8')) for k, v in headers.items()]
target_topic = dlq_topic if dlq_sent else retry_topic
producedRecord = producer.send(
target_topic, value=value, key=str(key).encode('utf-8'), headers=headers)
print(
f'producer response to target topic {target_topic} is {producedRecord.get()}')
def handler(event, context):
print('**********PROCESSING*********')
print("Event received", event)
x: dict = event["records"]
for topicData in x.values():
for record in topicData:
process_record(record)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论