Kinesis Firehose Lambda 转换和动态分区

发布于 2025-01-17 09:31:53 字数 4345 浏览 4 评论 0 原文

以下提供的数据来自 faker 库。我正在努力学习和实施 kinesis Firehose 中的动态分区

示例负载输入

{
   "name":"Dr. Nancy Mcmillan",
   "phone_numbers":"8XXXXX",
   "city":"Priscillaport",
   "address":"908 Mitchell Views SXXXXXXXX 42564",
   "date":"1980-07-11",
   "customer_id":"3"
}

示例输入代码

def main():
    
    import boto3
    import json
    
    AWS_ACCESS_KEY = "XXXXX"
    AWS_SECRET_KEY = "XXX"
    AWS_REGION_NAME = "us-east-1"

    for i in range(1,13):
        faker = Faker()
        json_data = {
            "name": faker.name(),
            "phone_numbers": faker.phone_number(),
            "city": faker.city(),
            "address": faker.address(),
            "date": str(faker.date()),
            "customer_id": str(random.randint(1, 5))
        }
        print(json_data)
        hasher = MyHasher(key=json_data)
        res = hasher.get()

        client = boto3.client(
            "kinesis",
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_KEY,
            region_name=AWS_REGION_NAME,
        )

        response = client.put_record(
            StreamName='XXX',
            Data=json.dumps(json_data),
            PartitionKey='test',
        )

        print(response)

这是工作正常的 lambda 代码

try:
    import json
    import boto3
    import base64
    from dateutil import parser
except Exception as e:
    pass

class MyHasher(object):
    def __init__(self, key):
        self.key = key

    def get(self):
        keys = str(self.key).encode("UTF-8")
        keys = base64.b64encode(keys)
        keys = keys.decode("UTF-8")
        return keys


def lambda_handler(event, context):
    print("Event")
    print(event)

    output = []

    for record in event["records"]:
        dat = base64.b64decode(record["data"])
        serialize_payload = json.loads(dat)
        print("serialize_payload", serialize_payload)

        json_new_line = str(serialize_payload) + "\n"

        hasherHelper = MyHasher(key=json_new_line)
        hash = hasherHelper.get()

        partition_keys = {"customer_id": serialize_payload.get("customer_id")}

        _ = {
             "recordId": record["recordId"],
             "result": "Ok",
             "data": hash,
             'metadata': {
                 'partitionKeys':
                     partition_keys
             }
             }

        print(_)

        output.append(_)
    print("*****************")
    print(output)
    return {"records": output}





示例屏幕截图显示工作正常

这里是 firehose 的设置动态分区

image

image

由于某种原因,在 AWS S3 上我看到一个错误文件夹和我的所有消息显示

我已经成功实现了 lambda 转换并制作了一个视频,可以在下面找到我目前陷入困境在动态分区上,我尝试阅读几篇文章,但这没有帮助

https: //www.youtube.com/watch?v=6wot9Z93vAY&t=231s

再次感谢您,并期待收到你们

来信

的 href="https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html" rel="nofollow noreferrer">https://docs.aws.amazon.com/firehose/latest/ dev/dynamic-partitioning.html

https://www.youtube.com/watch?v=HcOVAFn-KhM

https://www.youtube.com/watch?v=PoaKgHdJgCE

https://medium.com/@bv_subhash/kinesis-firehose-performs-partitioning-based-on-timestamps-and-creates-files-in-s3-but-they-would-13efd51f6d39

<一个href="https://www.amazonaws.cn/en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/" rel="nofollow noreferrer">https://www.amazonaws.cn /en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/

The following data presented is from the faker library. i am trying to learn and implement
dynamic partition in kinesis Firehose

Sample payload Input

{
   "name":"Dr. Nancy Mcmillan",
   "phone_numbers":"8XXXXX",
   "city":"Priscillaport",
   "address":"908 Mitchell Views SXXXXXXXX 42564",
   "date":"1980-07-11",
   "customer_id":"3"
}

Sample Input code

def main():
    
    import boto3
    import json
    
    AWS_ACCESS_KEY = "XXXXX"
    AWS_SECRET_KEY = "XXX"
    AWS_REGION_NAME = "us-east-1"

    for i in range(1,13):
        faker = Faker()
        json_data = {
            "name": faker.name(),
            "phone_numbers": faker.phone_number(),
            "city": faker.city(),
            "address": faker.address(),
            "date": str(faker.date()),
            "customer_id": str(random.randint(1, 5))
        }
        print(json_data)
        hasher = MyHasher(key=json_data)
        res = hasher.get()

        client = boto3.client(
            "kinesis",
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_KEY,
            region_name=AWS_REGION_NAME,
        )

        response = client.put_record(
            StreamName='XXX',
            Data=json.dumps(json_data),
            PartitionKey='test',
        )

        print(response)

Here is lambda code which work fine

try:
    import json
    import boto3
    import base64
    from dateutil import parser
except Exception as e:
    pass

class MyHasher(object):
    def __init__(self, key):
        self.key = key

    def get(self):
        keys = str(self.key).encode("UTF-8")
        keys = base64.b64encode(keys)
        keys = keys.decode("UTF-8")
        return keys


def lambda_handler(event, context):
    print("Event")
    print(event)

    output = []

    for record in event["records"]:
        dat = base64.b64decode(record["data"])
        serialize_payload = json.loads(dat)
        print("serialize_payload", serialize_payload)

        json_new_line = str(serialize_payload) + "\n"

        hasherHelper = MyHasher(key=json_new_line)
        hash = hasherHelper.get()

        partition_keys = {"customer_id": serialize_payload.get("customer_id")}

        _ = {
             "recordId": record["recordId"],
             "result": "Ok",
             "data": hash,
             'metadata': {
                 'partitionKeys':
                     partition_keys
             }
             }

        print(_)

        output.append(_)
    print("*****************")
    print(output)
    return {"records": output}





image

Sample screenshots show works fine

Here are setting on firehose for dynamic partition

image

image

some reason on AWS S3 I see an error folder and all my messages go into that

I have successfully implemented lambda transformation and have made a video which can be found below I am currently stuck on the dynamic partition I have tried reading several posts but that didn't help

https://www.youtube.com/watch?v=6wot9Z93vAY&t=231s

Thank you again and looking forward to hearing from you guys

Refernecs

https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

https://www.youtube.com/watch?v=HcOVAFn-KhM

https://www.youtube.com/watch?v=PoaKgHdJgCE

https://medium.com/@bv_subhash/kinesis-firehose-performs-partitioning-based-on-timestamps-and-creates-files-in-s3-but-they-would-13efd51f6d39

https://www.amazonaws.cn/en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

楠木可依 2025-01-24 09:31:53

动态分区有两个前缀选项。 1) partitionKeyFromQuery 2) partitionKeyFromLambda。如果您希望 Firehose 解析记录并获取分区键,请使用第一个选项。如果您想在执行转换后提供分区键,请使用第二个选项。
根据您的 firehose 配置,您使用 lambda 来提供分区键(第二个选项),但为第一个选项提供了前缀。要解决此问题,请禁用内联解析并向 firehose 前缀添加第二个选项 !{partitionKeyFromLambda:customer_id}/ 或删除 lambda 转换并保留内联解析

There are two prefix options for dynamic partitioning. 1) partitionKeyFromQuery 2) partitionKeyFromLambda. If you want firehose to parse record and get partition key then use first option. If you want to provide partition key after performing transformation use second option.
As per your firehose config, you are using lambda to provide partition key (second option) but prefix is provided for first option. To resolve this issue either disable inline parsing and add second option to firehose prefix !{partitionKeyFromLambda:customer_id}/ or remove lambda transformation and keep inline parsing

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文