使用 AWS Glue 在两个 S3 存储桶之间加载数据时如何更新数据?

发布于 2025-01-19 11:26:51 字数 1314 浏览 4 评论 0原文

这是我的第一个数据分析项目,我正在 AWS 上开发数据管道,管道步骤应如下所示:

  1. 以 parquet 格式将数据从 RDS 导出到 S3(完成)。
  2. 使用 Athena 查询 S3 中的数据(完成)。
  3. 更新无效数据并将其传输到新的 S3 存储桶。
  4. 从第二个S3存储桶中查询清理后的数据。

我陷入了步骤 3,在这一步中,验证团队必须通过从第一个 S3 存储桶查询数据、更新无效数据并将其复制到第二个 S3 存储桶来过滤有效数据并更新无效数据。

一个考虑因素是旧的无效数据必须按原样保留在第一个存储桶中,更新过程必须在传输过程中发生。

有没有办法在 Athena 中使用 UPDATE 语句?

我们可以在 AWS Glue 作业中传递所需的 UPDATE 语句吗?

我尝试使用此 问题 中的以下脚本来更新数据:

from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database='{{database}}',table_name='{{table_name}}'
df = dyF.toDF()
df.registerTempTable('{{name}}')
df = sqlContext.sql("{{sql_query_on_above_created_table}}")
df.format('parquet').save('{{s3_location}}')

但我得到了这个错误:

SyntaxError: invalid syntax (Untitled job.py, line 7)

我读到了有关将 AWS Data Pipeline 与 EMR 结合使用的信息,但我认为它很复杂,我无法想象它如何提供按需查询服务?

在两个 S3 存储桶之间传输数据并保持旧版本不变并将新数据放入新的 S3 存储桶中时更新数据的最佳解决方案是什么?

This is my first data analytics project and I'm working on a data pipeline on AWS, the pipeline steps should be as follow:

  1. Export data from RDS to S3 in parquet format (Done).
  2. Query data in S3 using Athena (Done).
  3. Update the invalid data and transfer it to a new S3 Bucket.
  4. Query the cleaned data from the second S3 bucket.

I'm stuck in step 3, in this step, a validation team must filter the valid data and update the invalid data by querying the data from the first S3 bucket, updating the invalid data and copying it to the second S3 bucket.

One consideration is that the old invalid data must remain as is in the first bucket, the updating process must occur while transferring process.

Is there a way to use the UPDATE statement in Athena?

Could we pass the required UPDATE statement in an AWS Glue job?

I tried the following script from this question to update the data:

from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database='{{database}}',table_name='{{table_name}}'
df = dyF.toDF()
df.registerTempTable('{{name}}')
df = sqlContext.sql("{{sql_query_on_above_created_table}}")
df.format('parquet').save('{{s3_location}}')

But I got This Error:

SyntaxError: invalid syntax (Untitled job.py, line 7)

I read about using AWS Data Pipeline with EMR, but I think it's complicated and I can't imagine how it can serve on-demand queries?

What is the best solution to update the data while transferring it between two S3 buckets and keeping the old version as is and putting the new data in a new S3 bucket??

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

病毒体 2025-01-26 11:26:52

首先想到了第3步的几个选项

,雅典娜没有更新查询,但您可能会使用 ctas 查询

CREATE TABLE new_table
WITH (
      format = 'Parquet',
      external_location = 's3://my-other-bucket/'),
      write_compression = 'SNAPPY')
AS SELECT *, plusSomeTransformations
FROM existing_table;

或用胶水像您在问题中一样

from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database='myDB',table_name='existing_table'
df = dyF.toDF()
df.registerTempTable('myTable')
df = sqlContext.sql("SELECT *, plusSomeTransformations FROM myTable")
df.format('parquet').save('s3://my-other-bucket/')

A couple of options come to mind for Step 3

First, Athena doesn't have Update queries but you could likely use a CTAS query

CREATE TABLE new_table
WITH (
      format = 'Parquet',
      external_location = 's3://my-other-bucket/'),
      write_compression = 'SNAPPY')
AS SELECT *, plusSomeTransformations
FROM existing_table;

or with Glue like you eluded to in your question

from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database='myDB',table_name='existing_table'
df = dyF.toDF()
df.registerTempTable('myTable')
df = sqlContext.sql("SELECT *, plusSomeTransformations FROM myTable")
df.format('parquet').save('s3://my-other-bucket/')
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文