Pyspark DataFrames与GLUE DYAMICFRAMES的性能

发布于 2025-01-22 19:14:58 字数 2718 浏览 2 评论 0原文

因此,我最近首次开始使用Glue和Pyspark。任务是创建一个执行以下操作的胶作业:

  1. 来自S3存储桶中的Parquet文件的加载数据
  2. 将过滤器应用于数据
  3. 添加列,其值是从其他2列派生到其他2列
  4. 写入S3的

将结果 。数据从S3到S3进行,我认为胶水动态框架应该适合此,我想出了以下代码:

def AddColumn(r):
   if r["option_type"] == 'S': 
       r["option_code_derived"]= 'S'+ r["option_code_4"]
   elif r["option_type"] == 'P': 
       r["option_code_derived"]= 'F'+ r["option_code_4"][1:]
   elif r["option_type"] == 'L':
       r["option_code_derived"]= 'P'+ r["option_code_4"]
   else:  
       r["option_code_derived"]= None
       
   return r

glueContext = GlueContext(create_spark_context(role_arn=args['role_arn']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [source_path], "recurse" : True}, format = source_format, additional_options = {"useS3ListImplementation":True})

filtered_gdf = Filter.apply(frame = inputGDF, f = lambda x: x["my_filter_column"] in ['50','80'])

additional_column_gdf = Map.apply(frame = filtered_gdf, f = AddColumn) 

gdf_mapped = ApplyMapping.apply(frame = additional_column_gdf, mappings = mappings, transformation_ctx = "gdf_mapped") 

glueContext.purge_s3_path(full_target_path_purge, {"retentionPeriod": 0})

outputGDF = glueContext.write_dynamic_frame.from_options(frame = gdf_mapped, connection_type = "s3", connection_options = {"path": full_target_path}, format = target_format)

这起作用,但需要很长时间(只有10个小时的时间为20 G1.X工人)。 现在,数据集很大(近20亿个记录,超过400 GB),但这仍然是出乎意料的(至少对我来说)。

然后,我再次尝试了它,这次是使用pyspark数据框架而不是动态框架。 该代码看起来如下:

glueContext = GlueContext(create_spark_context(role_arn=args['role_arn'], source_bucket=args['s3_source_bucket'], target_bucket=args['s3_target_bucket']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.parquet(full_source_path)

df_filtered = df.filter( (df.model_key_status  == '50') | (df.model_key_status  == '80') )

df_derived = df_filtered.withColumn('option_code_derived', 
     when(df_filtered.option_type == "S", concat(lit('S'), df_filtered.option_code_4))
    .when(df_filtered.option_type == "P", concat(lit('F'), df_filtered.option_code_4[2:42]))
    .when(df_filtered.option_type == "L", concat(lit('P'), df_filtered.option_code_4))
    .otherwise(None))

glueContext.purge_s3_path(full_purge_path, {"retentionPeriod": 0})

df_reorderered = df_derived.select(target_columns)

df_reorderered.write.parquet(full_target_path, mode="overwrite")

这也有效,但是具有相同的设置(20型G1.X,同一数据集的工人),这需要少于20分钟。

我的问题是:动态框架和数据范围之间的性能巨大差异来自哪里?我在第一次尝试中做了根本上错误的事情吗?

So I recently started using Glue and PySpark for the first time. The task was to create a Glue job that does the following:

  1. Load data from parquet files residing in an S3 bucket
  2. Apply a filter to the data
  3. Add a column, the value of which is derived from 2 other columns
  4. Write the result to S3

Since the data is going from S3 to S3, I assumed that Glue DynamicFrames should be a decent fit for this, and I came up with the following code:

def AddColumn(r):
   if r["option_type"] == 'S': 
       r["option_code_derived"]= 'S'+ r["option_code_4"]
   elif r["option_type"] == 'P': 
       r["option_code_derived"]= 'F'+ r["option_code_4"][1:]
   elif r["option_type"] == 'L':
       r["option_code_derived"]= 'P'+ r["option_code_4"]
   else:  
       r["option_code_derived"]= None
       
   return r

glueContext = GlueContext(create_spark_context(role_arn=args['role_arn']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [source_path], "recurse" : True}, format = source_format, additional_options = {"useS3ListImplementation":True})

filtered_gdf = Filter.apply(frame = inputGDF, f = lambda x: x["my_filter_column"] in ['50','80'])

additional_column_gdf = Map.apply(frame = filtered_gdf, f = AddColumn) 

gdf_mapped = ApplyMapping.apply(frame = additional_column_gdf, mappings = mappings, transformation_ctx = "gdf_mapped") 

glueContext.purge_s3_path(full_target_path_purge, {"retentionPeriod": 0})

outputGDF = glueContext.write_dynamic_frame.from_options(frame = gdf_mapped, connection_type = "s3", connection_options = {"path": full_target_path}, format = target_format)

This works but takes a very long time (just short of 10 hours with 20 G1.X workers).
Now, the dataset is quite large (almost 2 billion records, over 400 GB), but this was still unexpected (to me at least).

Then I gave it another try, this time with PySpark DataFrames instead of DynamicFrames.
The code looks like the following:

glueContext = GlueContext(create_spark_context(role_arn=args['role_arn'], source_bucket=args['s3_source_bucket'], target_bucket=args['s3_target_bucket']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.parquet(full_source_path)

df_filtered = df.filter( (df.model_key_status  == '50') | (df.model_key_status  == '80') )

df_derived = df_filtered.withColumn('option_code_derived', 
     when(df_filtered.option_type == "S", concat(lit('S'), df_filtered.option_code_4))
    .when(df_filtered.option_type == "P", concat(lit('F'), df_filtered.option_code_4[2:42]))
    .when(df_filtered.option_type == "L", concat(lit('P'), df_filtered.option_code_4))
    .otherwise(None))

glueContext.purge_s3_path(full_purge_path, {"retentionPeriod": 0})

df_reorderered = df_derived.select(target_columns)

df_reorderered.write.parquet(full_target_path, mode="overwrite")

This also works, but with otherwise identical settings (20 workers of type G1.X, same dataset), this takes less than 20 minutes.

My question is: Where does this massive difference in performance between DynamicFrames and DataFrames come from? Was I doing something fundamentally wrong in the first try?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

森罗 2025-01-29 19:14:58

不确定是否答案,但是在这里是每个记录的围绕数据框的包装器

DynamicFrame类似于数据框架,除了每个记录都是自描述的,因此最初不需要架构

Not sure if its the answer, but here they explain that a DynamicFrame handles schema, I've understood it is a wrapper around DataFrame with some sort of metadata for each record

A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文