如何将非常大的CSV导入DynamoDB?

发布于 2025-02-05 17:52:53 字数 194 浏览 1 评论 0原文

因此,我的S3数据库中有很大的CSV文件(200万+行),我想将其导入DynamoDB。

我尝试的是:

  1. lambda 我设法使lambda功能正常工作,但是在我的功能超时后,只有大约120k行进口到DDB。

  2. 管道 使用管道时,它被卡在“等待跑步者”上,然后完全停止

So I have very large csv file in my s3 database (2 mil+ lines) and I want to import it to dynamodb.

What I tried:

  1. Lambda
    I manage to get the lambda function to work, but only around 120k lines were imported to ddb after my function being timed out.

  2. Pipeline
    When using pipeline it got stuck on "waiting for runner" followed by it stopping completely

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

谁的新欢旧爱 2025-02-12 17:52:53

这是一种无服务器的方法,可以在带有2个lambdas和一个SQS队列的小块中处理大型.csv

  1. 使用一次性读取器lambda,使用 s3> s3 select 从S3Object S 中查询.csv到位。请参阅 selectObjectContcontent API API。
  2. 读者Lambda将主要键放入SQS队列中。添加一个死信队列以捕获错误。
  3. 将队列添加为作家Lambda的活动源。启用批处理。如果需要,限制并发。
  4. 并行作者lambda Invocations从.csv使用S3选择:select * where * wery s.primary_key中获取其主要键的记录,in('id1','id2','id2','id3')从S3Object S
  5. 作者Lambda将其批次记录写入DynamoDB表。

Here's a serverless approach to process the large .csv in small chunks with 2 Lambdas and a SQS Queue:

  1. Using a one-off Reader Lambda, extract the primary key information for all records using S3 Select SQL to SELECT s.primary_key FROM S3Object s, querying the .csv in place. See the SelectObjectContent API for details.
  2. The Reader Lambda puts the primary keys into a SQS queue. Add a Dead Letter Queue to capture errors.
  3. Add the queue as a Writer Lambda's event source. Enable batching. Limit concurrency if desired.
  4. Parallel Writer Lambda invocations fetch records for its batch of primary keys from the .csv using S3 Select: SELECT * WHERE s.primary_key IN ('id1', 'id2', 'id3') FROM S3Object s
  5. The Writer Lambda writes its batch of records to the DynamoDB table.
一桥轻雨一伞开 2025-02-12 17:52:53

您可以设置外部EMR表(或者也许是Athena,因此您不需要EMR群集),一个用于S3文件,使用 dynamyodbstoragehandler 连接器。它支持将数据从DynamoDB到S3的复制,还仅通过在表之间运行和选择来将数据从S3复制到DynamoDB。

设置外部S3文件表的示例OS将是:

CREATE EXTERNAL TABLE s3_features_csv
    (feature_id       BIGINT,
    feature_name      STRING,
    feature_class     STRING,
    state_alpha       STRING,
    prim_lat_dec      DOUBLE,
    prim_long_dec     DOUBLE,
    elev_in_ft        BIGINT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3://<your_bucket>/<prefix_of_folder_containing_files>';

要设置DynamoDB外部表是:

CREATE EXTERNAL TABLE ddb_features
    (feature_id   BIGINT,
    feature_name  STRING,
    feature_class STRING,
    state_alpha   STRING,
    prim_lat_dec  DOUBLE,
    prim_long_dec DOUBLE,
    elev_in_ft    BIGINT)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES(
    "dynamodb.table.name" = "Features",
    "dynamodb.column.mapping"="feature_id:Id,feature_name:Name,feature_class:Class,state_alpha:State,prim_lat_dec:Latitude,prim_long_dec:Longitude,elev_in_ft:Elevation"
);

然后从S3复制到DynamoDB:

INSERT OVERWRITE TABLE ddb_features
SELECT
    feature_id,
    feature_name,
    feature_class,
    state_alpha,
    prim_lat_dec,
    prim_long_dec,
    elev_in_ft
FROM s3_features_csv;

覆盖物使其成为DynamoDB上的任何冲突记录(使用相同的PK和SK)都会覆盖通过插入的新数据。

You could setup external EMR tables (or maybe Athena so you'd not need an EMR cluster), one for the S3 files and one for the DynamoDb table using the DynamoDbStorageHandler connector. It supports copying data from DynamoDB to S3 and also from S3 to DynamoDB just by running inserts and selects between the tables.

An example os setting up an external S3 file table would be

CREATE EXTERNAL TABLE s3_features_csv
    (feature_id       BIGINT,
    feature_name      STRING,
    feature_class     STRING,
    state_alpha       STRING,
    prim_lat_dec      DOUBLE,
    prim_long_dec     DOUBLE,
    elev_in_ft        BIGINT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3://<your_bucket>/<prefix_of_folder_containing_files>';

And to setup the DynamoDB external table would be:

CREATE EXTERNAL TABLE ddb_features
    (feature_id   BIGINT,
    feature_name  STRING,
    feature_class STRING,
    state_alpha   STRING,
    prim_lat_dec  DOUBLE,
    prim_long_dec DOUBLE,
    elev_in_ft    BIGINT)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES(
    "dynamodb.table.name" = "Features",
    "dynamodb.column.mapping"="feature_id:Id,feature_name:Name,feature_class:Class,state_alpha:State,prim_lat_dec:Latitude,prim_long_dec:Longitude,elev_in_ft:Elevation"
);

And then to copy from S3 to DynamoDB:

INSERT OVERWRITE TABLE ddb_features
SELECT
    feature_id,
    feature_name,
    feature_class,
    state_alpha,
    prim_lat_dec,
    prim_long_dec,
    elev_in_ft
FROM s3_features_csv;

The OVERWRITE makes it so any conflicting records on DynamoDB (with same PK and SK) get overwritten by the new data being inserted.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文