Kinesis Firehose Lambda 转换和动态分区
以下提供的数据来自 faker 库。我正在努力学习和实施 kinesis Firehose 中的动态分区
示例负载输入
{
"name":"Dr. Nancy Mcmillan",
"phone_numbers":"8XXXXX",
"city":"Priscillaport",
"address":"908 Mitchell Views SXXXXXXXX 42564",
"date":"1980-07-11",
"customer_id":"3"
}
示例输入代码
def main():
import boto3
import json
AWS_ACCESS_KEY = "XXXXX"
AWS_SECRET_KEY = "XXX"
AWS_REGION_NAME = "us-east-1"
for i in range(1,13):
faker = Faker()
json_data = {
"name": faker.name(),
"phone_numbers": faker.phone_number(),
"city": faker.city(),
"address": faker.address(),
"date": str(faker.date()),
"customer_id": str(random.randint(1, 5))
}
print(json_data)
hasher = MyHasher(key=json_data)
res = hasher.get()
client = boto3.client(
"kinesis",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION_NAME,
)
response = client.put_record(
StreamName='XXX',
Data=json.dumps(json_data),
PartitionKey='test',
)
print(response)
这是工作正常的 lambda 代码
try:
import json
import boto3
import base64
from dateutil import parser
except Exception as e:
pass
class MyHasher(object):
def __init__(self, key):
self.key = key
def get(self):
keys = str(self.key).encode("UTF-8")
keys = base64.b64encode(keys)
keys = keys.decode("UTF-8")
return keys
def lambda_handler(event, context):
print("Event")
print(event)
output = []
for record in event["records"]:
dat = base64.b64decode(record["data"])
serialize_payload = json.loads(dat)
print("serialize_payload", serialize_payload)
json_new_line = str(serialize_payload) + "\n"
hasherHelper = MyHasher(key=json_new_line)
hash = hasherHelper.get()
partition_keys = {"customer_id": serialize_payload.get("customer_id")}
_ = {
"recordId": record["recordId"],
"result": "Ok",
"data": hash,
'metadata': {
'partitionKeys':
partition_keys
}
}
print(_)
output.append(_)
print("*****************")
print(output)
return {"records": output}
示例屏幕截图显示工作正常
这里是 firehose 的设置动态分区
由于某种原因,在 AWS S3 上我看到一个错误文件夹和我的所有消息显示
我已经成功实现了 lambda 转换并制作了一个视频,可以在下面找到我目前陷入困境在动态分区上,我尝试阅读几篇文章,但这没有帮助
https: //www.youtube.com/watch?v=6wot9Z93vAY&t=231s
再次感谢您,并期待收到你们
来信
的 href="https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html" rel="nofollow noreferrer">https://docs.aws.amazon.com/firehose/latest/ dev/dynamic-partitioning.html
https://www.youtube.com/watch?v=HcOVAFn-KhM
https://www.youtube.com/watch?v=PoaKgHdJgCE
<一个href="https://www.amazonaws.cn/en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/" rel="nofollow noreferrer">https://www.amazonaws.cn /en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
动态分区有两个前缀选项。 1)
partitionKeyFromQuery
2)partitionKeyFromLambda
。如果您希望 Firehose 解析记录并获取分区键,请使用第一个选项。如果您想在执行转换后提供分区键,请使用第二个选项。根据您的 firehose 配置,您使用 lambda 来提供分区键(第二个选项),但为第一个选项提供了前缀。要解决此问题,请禁用内联解析并向 firehose 前缀添加第二个选项
!{partitionKeyFromLambda:customer_id}/
或删除 lambda 转换并保留内联解析There are two prefix options for dynamic partitioning. 1)
partitionKeyFromQuery
2)partitionKeyFromLambda
. If you want firehose to parse record and get partition key then use first option. If you want to provide partition key after performing transformation use second option.As per your firehose config, you are using lambda to provide partition key (second option) but prefix is provided for first option. To resolve this issue either disable inline parsing and add second option to firehose prefix
!{partitionKeyFromLambda:customer_id}/
or remove lambda transformation and keep inline parsing