Python AWS 库 Boto3:kinesis.get_records(shard_iterator)

发布于 2025-01-12 08:16:38 字数 1722 浏览 1 评论 0原文

我无法处理从 boto3 库运动函数 get_records(ShardIterator=shard_iterator,Limit=123) 获取的记录。

我得到的数据看起来像 b '\xe0\x01\x00\xea\xee\x01\xe4\x81\x83\xde\x01\xdf\x87\xbe\x01\xdb\x8dqldbStreamArn\x8arecordType\x87payload\x8cblockAddress\x88strandId\x8aseequenceNo\x8dtransactionId\ x8e\x8eblockTimestamp\x89blockHash\x8bentriesHash\x8e\x91previousBlockHash\x8e\x8fentriesHashList\x8e\x8ftransactionInfo\x8astatements\x89statement\x89startTime\x8e\x8f statementsDigest\xde\x03\xe5\x8a\x8e\xcdarn:aws:qldb:ap-southeast-1:417160253190:stre上午/钱包/5vHof6xwrwr2hzc2tw6HYw\x8b\x8dBLOCK_SUMMARY\x8c\xde\x03\x82\x8d\xde\x9d\x8 e\x8e\x96G8ijftVQqbp6Azm3UptzGq\x8f"\x1b\xa9\x90\x8e\x96EVaGqOlkbkwB9mBuOkdKTI\x91k\ x80\x0f\xe6\x83\x87\x89\x8c\x96\xc3\x01\r\x92\xae\xa0\x99R\xbe\xcba\xc5\xbc\xef\xcdd \xc2am\xc0\xb8Ro\xf3\x88\xa3tz\x94o\x8a\x98\x15`h@\x18\x93\xae\xa0\xee\xf2\x9d\xf5@*e\x13\x82 \xd5\xee\x88\x018\xc2\xc4\xa3\x15\xc9\x85 \xe7\xea\xbf\xb0\x1eU\xf0\x124w\x94\xae\xa0\x94\x1b@\x11V\x90S\xe7\x1b\x8ei\xf7\xc5\xba}\x0cJ\x10\xe9Rx\x9 1q\x14\x8c\x02`\x0b\xdc+\x13\xcb\x95\xbe\xc5\xae\xa0u/\xc6M\xb4\x97\xf8\xa9\xa4&1\xb7*\xb2\x17\x1dB \xc 6c\x92\x9d\xcc\xecP2\r\xcf\xfa\xf8\x1f\x0f\x85\xa0\xae\xa0\xe9}\x7f\xc0>\xdf\xf5-\x8f\x9d\xa6kg\xa8 \xf0 \xbfHybS,\x83\xbc\x8f\xbe\x1bG\x07\xd1\xbf{:\x96\xde\x01\x87\x97\xbe\x01\x83\xde\x01\x80\x98\x8e\xcdSELECT VALUE 名称来自 information_schema.user_tables WHERE status = \'活动\'\x99k\x80\x0f\xe6\x83\x87\x89\x8c\x96\xc3\x00\xdd\x9a\xae\xa0\xaf\xbd\xae\x00 \xbf\xb8ds\xb3\xf4\xa1\xd3tL\x9f\xe8R=\x12\xe2\xc5*\xbcwx\x98\xb1\xa37(\x1c\xcb'

并且在执行 ion.loads(data) 时收到错误 “顶级中包含的未知值中位置 1 处存在非法字符 ó。待定值:bytearray(b'')"

请帮助我处理此类数据。

I am not able to process the records that i am getting from boto3 library kinesis function get_records(ShardIterator=shard_iterator,Limit=123).

The data I am getting is looks like b '\xe0\x01\x00\xea\xee\x01\xe4\x81\x83\xde\x01\xdf\x87\xbe\x01\xdb\x8dqldbStreamArn\x8arecordType\x87payload\x8cblockAddress\x88strandId\x8asequenceNo\x8dtransactionId\x8e\x8eblockTimestamp\x89blockHash\x8bentriesHash\x8e\x91previousBlockHash\x8e\x8fentriesHashList\x8e\x8ftransactionInfo\x8astatements\x89statement\x89startTime\x8e\x8fstatementDigest\xde\x03\xe5\x8a\x8e\xcdarn:aws:qldb:ap-southeast-1:417160253190:stream/wallet/5vHof6xwrwr2hzc2tw6HYw\x8b\x8dBLOCK_SUMMARY\x8c\xde\x03\x82\x8d\xde\x9d\x8e\x8e\x96G8ijftVQqbp6Azm3UptzGq\x8f"\x1b\xa9\x90\x8e\x96EVaGqOlkbkwB9mBuOkdKTI\x91k\x80\x0f\xe6\x83\x87\x89\x8c\x96\xc3\x01\r\x92\xae\xa0\x99R\xbe\xcba\xc5\xbc\xef\xcdd \xc2am\xc0\xb8Ro\xf3\x88\xa3tz\x94o\x8a\x98\x15`h@\x18\x93\xae\xa0\xee\xf2\x9d\xf5@*e\x13\x82 \xd5\xee\x88\x018\xc2\xc4\xa3\x15\xc9\x85 \xe7\xea\xbf\xb0\x1eU\xf0\x124w\x94\xae\xa0\x94\x1b@\x11V\x90S\xe7\x1b\x8ei\xf7\xc5\xba}\x0cJ\x10\xe9Rx\x91q\x14\x8c\x02`\x0b\xdc+\x13\xcb\x95\xbe\xc5\xae\xa0u/\xc6M\xb4\x97\xf8\xa9\xa4&1\xb7*\xb2\x17\x1dB\xc6c\x92\x9d\xcc\xecP2\r\xcf\xfa\xf8\x1f\x0f\x85\xa0\xae\xa0\xe9}\x7f\xc0>\xdf\xf5-\x8f\x9d\xa6kg\xa8\xf0\xbfHybS,\x83\xbc\x8f\xbe\x1bG\x07\xd1\xbf{:\x96\xde\x01\x87\x97\xbe\x01\x83\xde\x01\x80\x98\x8e\xcdSELECT VALUE name FROM information_schema.user_tables WHERE status = \'ACTIVE\'\x99k\x80\x0f\xe6\x83\x87\x89\x8c\x96\xc3\x00\xdd\x9a\xae\xa0\xaf\xbd\xae\x00\xbf\xb8ds\xb3\xf4\xa1\xd3tL\x9f\xe8R=\x12\xe2\xc5*\xbcwx\x98\xb1\xa37(\x1c\xcb'

and while doing ion.loads(data) getting error "Illegal character ó at position 1 in unknown value contained in top-level. Pending value: bytearray(b'')"

Please help me out to process these kind of data.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

微凉徒眸意 2025-01-19 08:16:38

遵循 Kinesis.Client< /a> 文档,您必须提供一个分片迭代器,并且在迭代可用记录之后可以继续使用下一个分片迭代器。

以下是自某个时间点以来记录迭代的基本示例:

import boto3

if __name__ == '__main__':
    client = boto3.client("kinesis", region_name="us-east-1")
    # It is up to you to decide how to configure shards iterator
    shard_iterator = client.get_shard_iterator(
        StreamName="demo_stream",
        ShardId="shardId-000000000001",
        ShardIteratorType="AT_TIMESTAMP",
        Timestamp=1459799926.480,
    )["ShardIterator"]

    while shard_iterator is not None:
        result = client.get_records(ShardIterator=shard_iterator)
        records = result["Records"]
        shard_iterator = result["NextShardIterator"]

        for record in records:
            print(record["Data"])

查看 文档了解更多详细信息。

Following the Kinesis.Client documentation you have to provide a shard iterator and after iteration of the available records can proceed with next shard iterator.

Here is a basic example of iteration of the records since some point in time:

import boto3

if __name__ == '__main__':
    client = boto3.client("kinesis", region_name="us-east-1")
    # It is up to you to decide how to configure shards iterator
    shard_iterator = client.get_shard_iterator(
        StreamName="demo_stream",
        ShardId="shardId-000000000001",
        ShardIteratorType="AT_TIMESTAMP",
        Timestamp=1459799926.480,
    )["ShardIterator"]

    while shard_iterator is not None:
        result = client.get_records(ShardIterator=shard_iterator)
        records = result["Records"]
        shard_iterator = result["NextShardIterator"]

        for record in records:
            print(record["Data"])

Check out the documentation for more details.

乜一 2025-01-19 08:16:38

您需要执行如下操作:

base64_str = base64.b64encode(record["Data"]).decode("utf-8")

data = json.loads(decompress(base64.b64decode(data)))

You need to do something like the following:

base64_str = base64.b64encode(record["Data"]).decode("utf-8")

data = json.loads(decompress(base64.b64decode(data)))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文