Pyspark DataFrames与GLUE DYAMICFRAMES的性能
因此,我最近首次开始使用Glue和Pyspark。任务是创建一个执行以下操作的胶作业:
- 来自S3存储桶中的Parquet文件的加载数据
- 将过滤器应用于数据
- 添加列,其值是从其他2列派生到其他2列
- 写入S3的
将结果 。数据从S3到S3进行,我认为胶水动态框架应该适合此,我想出了以下代码:
def AddColumn(r):
if r["option_type"] == 'S':
r["option_code_derived"]= 'S'+ r["option_code_4"]
elif r["option_type"] == 'P':
r["option_code_derived"]= 'F'+ r["option_code_4"][1:]
elif r["option_type"] == 'L':
r["option_code_derived"]= 'P'+ r["option_code_4"]
else:
r["option_code_derived"]= None
return r
glueContext = GlueContext(create_spark_context(role_arn=args['role_arn']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [source_path], "recurse" : True}, format = source_format, additional_options = {"useS3ListImplementation":True})
filtered_gdf = Filter.apply(frame = inputGDF, f = lambda x: x["my_filter_column"] in ['50','80'])
additional_column_gdf = Map.apply(frame = filtered_gdf, f = AddColumn)
gdf_mapped = ApplyMapping.apply(frame = additional_column_gdf, mappings = mappings, transformation_ctx = "gdf_mapped")
glueContext.purge_s3_path(full_target_path_purge, {"retentionPeriod": 0})
outputGDF = glueContext.write_dynamic_frame.from_options(frame = gdf_mapped, connection_type = "s3", connection_options = {"path": full_target_path}, format = target_format)
这起作用,但需要很长时间(只有10个小时的时间为20 G1.X工人)。 现在,数据集很大(近20亿个记录,超过400 GB),但这仍然是出乎意料的(至少对我来说)。
然后,我再次尝试了它,这次是使用pyspark数据框架而不是动态框架。 该代码看起来如下:
glueContext = GlueContext(create_spark_context(role_arn=args['role_arn'], source_bucket=args['s3_source_bucket'], target_bucket=args['s3_target_bucket']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.parquet(full_source_path)
df_filtered = df.filter( (df.model_key_status == '50') | (df.model_key_status == '80') )
df_derived = df_filtered.withColumn('option_code_derived',
when(df_filtered.option_type == "S", concat(lit('S'), df_filtered.option_code_4))
.when(df_filtered.option_type == "P", concat(lit('F'), df_filtered.option_code_4[2:42]))
.when(df_filtered.option_type == "L", concat(lit('P'), df_filtered.option_code_4))
.otherwise(None))
glueContext.purge_s3_path(full_purge_path, {"retentionPeriod": 0})
df_reorderered = df_derived.select(target_columns)
df_reorderered.write.parquet(full_target_path, mode="overwrite")
这也有效,但是具有相同的设置(20型G1.X,同一数据集的工人),这需要少于20分钟。
我的问题是:动态框架和数据范围之间的性能巨大差异来自哪里?我在第一次尝试中做了根本上错误的事情吗?
So I recently started using Glue and PySpark for the first time. The task was to create a Glue job that does the following:
- Load data from parquet files residing in an S3 bucket
- Apply a filter to the data
- Add a column, the value of which is derived from 2 other columns
- Write the result to S3
Since the data is going from S3 to S3, I assumed that Glue DynamicFrames should be a decent fit for this, and I came up with the following code:
def AddColumn(r):
if r["option_type"] == 'S':
r["option_code_derived"]= 'S'+ r["option_code_4"]
elif r["option_type"] == 'P':
r["option_code_derived"]= 'F'+ r["option_code_4"][1:]
elif r["option_type"] == 'L':
r["option_code_derived"]= 'P'+ r["option_code_4"]
else:
r["option_code_derived"]= None
return r
glueContext = GlueContext(create_spark_context(role_arn=args['role_arn']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [source_path], "recurse" : True}, format = source_format, additional_options = {"useS3ListImplementation":True})
filtered_gdf = Filter.apply(frame = inputGDF, f = lambda x: x["my_filter_column"] in ['50','80'])
additional_column_gdf = Map.apply(frame = filtered_gdf, f = AddColumn)
gdf_mapped = ApplyMapping.apply(frame = additional_column_gdf, mappings = mappings, transformation_ctx = "gdf_mapped")
glueContext.purge_s3_path(full_target_path_purge, {"retentionPeriod": 0})
outputGDF = glueContext.write_dynamic_frame.from_options(frame = gdf_mapped, connection_type = "s3", connection_options = {"path": full_target_path}, format = target_format)
This works but takes a very long time (just short of 10 hours with 20 G1.X workers).
Now, the dataset is quite large (almost 2 billion records, over 400 GB), but this was still unexpected (to me at least).
Then I gave it another try, this time with PySpark DataFrames instead of DynamicFrames.
The code looks like the following:
glueContext = GlueContext(create_spark_context(role_arn=args['role_arn'], source_bucket=args['s3_source_bucket'], target_bucket=args['s3_target_bucket']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.parquet(full_source_path)
df_filtered = df.filter( (df.model_key_status == '50') | (df.model_key_status == '80') )
df_derived = df_filtered.withColumn('option_code_derived',
when(df_filtered.option_type == "S", concat(lit('S'), df_filtered.option_code_4))
.when(df_filtered.option_type == "P", concat(lit('F'), df_filtered.option_code_4[2:42]))
.when(df_filtered.option_type == "L", concat(lit('P'), df_filtered.option_code_4))
.otherwise(None))
glueContext.purge_s3_path(full_purge_path, {"retentionPeriod": 0})
df_reorderered = df_derived.select(target_columns)
df_reorderered.write.parquet(full_target_path, mode="overwrite")
This also works, but with otherwise identical settings (20 workers of type G1.X, same dataset), this takes less than 20 minutes.
My question is: Where does this massive difference in performance between DynamicFrames and DataFrames come from? Was I doing something fundamentally wrong in the first try?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
不确定是否答案,但是在这里是每个记录的围绕数据框的包装器
Not sure if its the answer, but here they explain that a DynamicFrame handles schema, I've understood it is a wrapper around DataFrame with some sort of metadata for each record