AWS Glue 没有为 pyspark 提供一致的结果 - orderBy

发布于 2025-01-12 08:10:28 字数 619 浏览 1 评论 0原文

在本地运行 pyspark 时,我得到了正确的结果,列表按 BOOK_ID 排序,但是在部署 AWS Glue 作业时,这些书籍似乎没有排序

root
 |-- AUTHORID: integer
 |-- NAME: string 
 |-- BOOK_LIST: array 
 |    |-- BOOK_ID: integer 
 |    |-- BOOK_NAME: string 
    from pyspark.sql import functions as F
    
    result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
              .orderBy(F.col("BOOK_ID").desc())
              .groupBy("AUTHOR_ID", "NAME")
              .agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
              )

注意:我正在使用 pyspark 3.2.1 和 Glue 2.0

请提出任何建议

when running pyspark locally I get correct results with list ordered by BOOK_ID, But when deploying the AWS Glue job, the books seem not to be ordered

root
 |-- AUTHORID: integer
 |-- NAME: string 
 |-- BOOK_LIST: array 
 |    |-- BOOK_ID: integer 
 |    |-- BOOK_NAME: string 
    from pyspark.sql import functions as F
    
    result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
              .orderBy(F.col("BOOK_ID").desc())
              .groupBy("AUTHOR_ID", "NAME")
              .agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
              )

Note: I'm using pyspark 3.2.1 and Glue 2.0

Any suggestion please

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

抹茶夏天i‖ 2025-01-19 08:10:28

假设

虽然我设法在支持 spark 3.1 的 Glue 3.0 上运行该作业,但 orderBy 仍然给出错误的结果

从 AWS Glue 迁移2.0 到 AWS Glue 3.0

似乎给出了良好结果的解决方案是将工作人员数量减少到 2,这是允许的最小工作人员数量
输入图片此处描述

解释是:Glue 作业可能有许多允许并行的 Worker,因此 orderBy 无法给出正确的结果,相反,我们只有一个 Worker

建议的解决方案

  • 使用最小数量的工作线程(这不是一个相关的解决方案)
  • 应用.orderBy 对于 join 之前的每个数据帧
  • 或使用 .coalesce(1)
 result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
              .coalesce(1)
              .orderBy(F.col("BOOK_ID").desc())
              .groupBy("AUTHOR_ID", "NAME")
              .agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
              )

这允许获得正确的结果,但在这种情况下我们会失败表现

Supposition

Although I managed to run the job on Glue 3.0 that supports spark 3.1, the orderBy still giving wrong result

Migrating from AWS Glue 2.0 to AWS Glue 3.0

The solution that seems to give a good result is to reduce the number of workers to 2 which is the minimum allowed number of workers
enter image description here

The explanation is: Glue jobs may have many workers that allow parallelism, thus the orderBy can't give a correct result in contrary where we have only one worker

Suggested Sollution

  • Use the minimum number of workers (which is not a pertinent solution)
  • Apply the .orderBy for each dataframe before the join
  • Or use .coalesce(1)
 result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
              .coalesce(1)
              .orderBy(F.col("BOOK_ID").desc())
              .groupBy("AUTHOR_ID", "NAME")
              .agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
              )

Which allow to get the right result but in this case we lose in performance

零度° 2025-01-19 08:10:28

我试图简化问题,与我合作:

让我们创建一个数据框示例:

>>> df = spark.createDataFrame([
    {"book_id": 1, "author_id": 1, "name": "David", "book_name": "Kill Bill"},
    {"book_id": 2, "author_id": 2, "name": "Roman", "book_name": "Dying is Hard"},
    {"book_id": 3, "author_id": 3, "name": "Moshe", "book_name": "Apache Kafka The Easy Way"},
    {"book_id": 4, "author_id": 1, "name": "David", "book_name": "Pyspark Is Awesome"},
    {"book_id": 5, "author_id": 2, "name": "Roman", "book_name": "Playing a Piano"},
    {"book_id": 6, "author_id": 3, "name": "Moshe", "book_name": "Awesome Scala"}
 ])

现在,这样做:

(
df
.groupBy("author_id", "name")
.agg(F.collect_list(F.struct("book_id", "book_name")).alias("data"), F.sum("book_id").alias("sorted_key"))
.orderBy(F.col("sorted_key").desc()).drop("sorted_key")
.show(10, False)
)

我得到的正是您所要求的:

+---------+-----+----------------------------------------------------+
|author_id|name |collect_list(struct(book_id, book_name))            |
+---------+-----+----------------------------------------------------+
|3        |Moshe|[{3, Apache Kafka The Easy Way}, {6, Awesome Scala}]|
|2        |Roman|[{2, Dying is Hard}, {5, Playing a Piano}]          |
|1        |David|[{1, Kill Bill}, {4, Pyspark Is Awesome}]           |
+---------+-----+----------------------------------------------------+

Im trying to simplify the issue, work with me:

Lets create a dataframe sample:

>>> df = spark.createDataFrame([
    {"book_id": 1, "author_id": 1, "name": "David", "book_name": "Kill Bill"},
    {"book_id": 2, "author_id": 2, "name": "Roman", "book_name": "Dying is Hard"},
    {"book_id": 3, "author_id": 3, "name": "Moshe", "book_name": "Apache Kafka The Easy Way"},
    {"book_id": 4, "author_id": 1, "name": "David", "book_name": "Pyspark Is Awesome"},
    {"book_id": 5, "author_id": 2, "name": "Roman", "book_name": "Playing a Piano"},
    {"book_id": 6, "author_id": 3, "name": "Moshe", "book_name": "Awesome Scala"}
 ])

Now, Doing this:

(
df
.groupBy("author_id", "name")
.agg(F.collect_list(F.struct("book_id", "book_name")).alias("data"), F.sum("book_id").alias("sorted_key"))
.orderBy(F.col("sorted_key").desc()).drop("sorted_key")
.show(10, False)
)

Im getting exactly what you are allegedly asking for:

+---------+-----+----------------------------------------------------+
|author_id|name |collect_list(struct(book_id, book_name))            |
+---------+-----+----------------------------------------------------+
|3        |Moshe|[{3, Apache Kafka The Easy Way}, {6, Awesome Scala}]|
|2        |Roman|[{2, Dying is Hard}, {5, Playing a Piano}]          |
|1        |David|[{1, Kill Bill}, {4, Pyspark Is Awesome}]           |
+---------+-----+----------------------------------------------------+

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文