Pyspark 多个列表中每个元素的平均值

发布于 2025-01-11 16:58:34 字数 484 浏览 0 评论 0原文

我有一个包含 2 列的 df:

  • id
  • 矢量

这是它的外观示例:

+--------------------+----------+
|              vector|        id|
+--------------------+----------+
|[8.32,3.22,5.34,6.5]|1046091128|
|[8.52,3.34,5.31,6.3]|1046091128|
|[8.44,3.62,5.54,6.4]|1046091128|
|[8.31,3.12,5.21,6.1]|1046091128|
+--------------------+----------+

我想要 groupBy appid 并取向量每个元素的平均值。例如,聚合列表中的第一个值将是 (8.32+8.52+8.44+8.31)/4 等等。

任何帮助表示赞赏。

I have a df with 2 columns:

  • id
  • vector

This is a sample of how it looks:

+--------------------+----------+
|              vector|        id|
+--------------------+----------+
|[8.32,3.22,5.34,6.5]|1046091128|
|[8.52,3.34,5.31,6.3]|1046091128|
|[8.44,3.62,5.54,6.4]|1046091128|
|[8.31,3.12,5.21,6.1]|1046091128|
+--------------------+----------+

I want to groupBy appid and take the mean of each element of the vectors. So for example the first value in the aggregated list will be (8.32+8.52+8.44+8.31)/4 and so on.

Any help is appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

黎夕旧梦 2025-01-18 16:58:34

这假设您知道数组列的长度:

l = 4 #size of array column
df1 = df.select("id",*[F.col("vector")[i] for i in range(l)])
out = df1.groupby("id").agg(F.array([F.mean(i) 
                            for i in df1.columns[1:]]).alias("vector"))

out.show(truncate=False)

+----------+----------------------------------------+
|id        |vector                                  |
+----------+----------------------------------------+
|1046091128|[8.3975, 3.325, 5.35, 6.325000000000001]|
+----------+----------------------------------------

This assumes that you know the length of the array column:

l = 4 #size of array column
df1 = df.select("id",*[F.col("vector")[i] for i in range(l)])
out = df1.groupby("id").agg(F.array([F.mean(i) 
                            for i in df1.columns[1:]]).alias("vector"))

out.show(truncate=False)

+----------+----------------------------------------+
|id        |vector                                  |
+----------+----------------------------------------+
|1046091128|[8.3975, 3.325, 5.35, 6.325000000000001]|
+----------+----------------------------------------
ㄟ。诗瑗 2025-01-18 16:58:34

您可以使用 posexplode 函数,然后根据平均值聚合列。如下所示 -

from pyspark.sql.functions import  *
from pyspark.sql.types import  *

data = [([8.32,3.22,5.34,6.5], 1046091128 ), ([8.52,3.34,5.31,6.3], 1046091128), ([8.44,3.62,5.54,6.4], 1046091128), ([8.31,3.12,5.21,6.1], 1046091128)]
schema = StructType([ StructField("vector", ArrayType(FloatType())), StructField("id", IntegerType()) ])

df = spark.createDataFrame(data=data,schema=schema)

df.select("id", posexplode("vector")).groupBy("id").pivot("pos").agg(avg("col")).show()

输出看起来有点像

+----------+-----------------+------------------+-----------------+-----------------+
|        id|                0|                 1|                2|                3|
+----------+-----------------+------------------+-----------------+-----------------+
|1046091128|8.397500038146973|3.3249999284744263|5.350000023841858|6.325000047683716|
+----------+-----------------+------------------+-----------------+-----------------+

如果需要,您可以稍后重命名列。

还可以通过按 id 和 pos 分组,然后单独按 id 分组来避免枢纽,以收集列表

df.select("id", posexplode("vector")).groupby('id','pos').agg(avg('col').alias('vector')).groupby('id').agg(collect_list('vector').alias('vector')).show(truncate=False)

结果

+----------+-----------------------------------------------------------------------------+
|id        |vector                                                                       |
+----------+-----------------------------------------------------------------------------+
|1046091128|[8.397500038146973, 5.350000023841858, 3.3249999284744263, 6.325000047683716]|
+----------+-----------------------------------------------------------------------------+

You can use posexplode function and then aggregate the column based upon average. Something like below -

from pyspark.sql.functions import  *
from pyspark.sql.types import  *

data = [([8.32,3.22,5.34,6.5], 1046091128 ), ([8.52,3.34,5.31,6.3], 1046091128), ([8.44,3.62,5.54,6.4], 1046091128), ([8.31,3.12,5.21,6.1], 1046091128)]
schema = StructType([ StructField("vector", ArrayType(FloatType())), StructField("id", IntegerType()) ])

df = spark.createDataFrame(data=data,schema=schema)

df.select("id", posexplode("vector")).groupBy("id").pivot("pos").agg(avg("col")).show()

Output would look somewhat like :

+----------+-----------------+------------------+-----------------+-----------------+
|        id|                0|                 1|                2|                3|
+----------+-----------------+------------------+-----------------+-----------------+
|1046091128|8.397500038146973|3.3249999284744263|5.350000023841858|6.325000047683716|
+----------+-----------------+------------------+-----------------+-----------------+

You can rename the columns later if required.

Could also avoid pivot by grouping by id and pos and then later grouping by id alone to collect_list

df.select("id", posexplode("vector")).groupby('id','pos').agg(avg('col').alias('vector')).groupby('id').agg(collect_list('vector').alias('vector')).show(truncate=False)

Outcome

+----------+-----------------------------------------------------------------------------+
|id        |vector                                                                       |
+----------+-----------------------------------------------------------------------------+
|1046091128|[8.397500038146973, 5.350000023841858, 3.3249999284744263, 6.325000047683716]|
+----------+-----------------------------------------------------------------------------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文