在Spark Scala聚集中扩展表达

发布于 2025-02-11 18:01:23 字数 1309 浏览 2 评论 0原文

我正在尝试将一个简单的聚合代码从Pyspark转换为Scala。

数据范围:

# PySpark
from pyspark.sql import functions as F
df = spark.createDataFrame(
    [([10, 100],),
     ([20, 200],)],
    ['vals'])
// Scala
val df = Seq(
    (Seq(10, 100)),
    (Seq(20, 200)),
).toDF("vals")

聚合扩展 - pyspark中的正常:

df2 = df.agg(
    *[F.sum(F.col("vals")[i]).alias(f"col{i}") for i in range(2)]
)
df2.show()
# +----+----+
# |col0|col1|
# +----+----+
# |  30| 300|
# +----+----+

但是在scala ...中

val df2 = df.agg(
  (0 until 2).map(i => sum($"vals"(i)).alias(s"col$i")): _*
)
 (0到2).map(i => sum($“ vals”(i))。别名(s“ col $ i”)): _*
                                                              ^
在第2行:错误:no`:_*`允许的注释
       (此类注释仅在论证中允许 * - 参数)
 

语法似乎与此select效果很好的语法几乎相同:

val df2 = df.select(
  (0 until 2).map(i => $"vals"(i).alias(s"col$i")): _*
)

表达式扩展在Scala Spark聚集中是否有效?如何?

I'm trying to convert a simple aggregation code from PySpark to Scala.

The dataframes:

# PySpark
from pyspark.sql import functions as F
df = spark.createDataFrame(
    [([10, 100],),
     ([20, 200],)],
    ['vals'])
// Scala
val df = Seq(
    (Seq(10, 100)),
    (Seq(20, 200)),
).toDF("vals")

Aggregation expansion - OK in PySpark:

df2 = df.agg(
    *[F.sum(F.col("vals")[i]).alias(f"col{i}") for i in range(2)]
)
df2.show()
# +----+----+
# |col0|col1|
# +----+----+
# |  30| 300|
# +----+----+

But in Scala...

val df2 = df.agg(
  (0 until 2).map(i => sum(
quot;vals"(i)).alias(s"col$i")): _*
)
         (0 until 2).map(i => sum(
quot;vals"(i)).alias(s"col$i")): _*
                                                              ^
On line 2: error: no `: _*` annotation allowed here
       (such annotations are only allowed in arguments to *-parameters)

The syntax seems almost the same to this select which works well:

val df2 = df.select(
  (0 until 2).map(i => 
quot;vals"(i).alias(s"col$i")): _*
)

Does expression expansion work in Scala Spark aggregations? How?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

猛虎独行 2025-02-18 18:01:23

我不完全理解编译器为什么会发生这种情况,但似乎并没有将您的seq [column]打开vararg作为参数。

正如@RVDV在他的帖子中提到的那样,该方法的签名是
def agg(expr:column,exprs:column*):dataframe

因此,临时解决方案是您手动打开它,例如:

val seq = Seq(0, 1).map(i => sum($"vals"(i)).alias(s"col$i"))
val df2 = df.agg(seq(0), seq(1))

i'm not fully understanding why this is happening for the compiler but it seems that it is not unpacking your Seq[Column] to vararg as params.

as @RvdV has mentioned in his post, the signature of the method is
def agg(expr: Column, exprs: Column*): DataFrame

so a temp solution is you unpack it manually, like:

val seq = Seq(0, 1).map(i => sum(
quot;vals"(i)).alias(s"col$i"))
val df2 = df.agg(seq(0), seq(1))
瀞厅☆埖开 2025-02-18 18:01:23

如果您查看

def agg(expr: Column, exprs: Column*): DataFrame 

因此,您应该首先具有任何其他聚合,然后对于第二个参数,您可以进行列表扩展。因此,

val df2 = df.agg(
  first($"vals"), (0 until 2).map(i => sum($"vals"(i)).alias(s"col$i")): _*
)

列表前面的任何其他单一聚合都应该起作用。
我不知道为什么会这样,也许这是一个限制,所以您不能通过空列表,根本没有聚合?

If you look at the documentation of Dataset.agg, you see that it first has a fixed parameter and then a list of unspecified length:

def agg(expr: Column, exprs: Column*): DataFrame 

So you should first have any other aggregation, then for the second argument you can do the list expansion. So something like

val df2 = df.agg(
  first(
quot;vals"), (0 until 2).map(i => sum(
quot;vals"(i)).alias(s"col$i")): _*
)

or any other single aggregation in front of the list should work.
I don't know why it is like this, maybe it's a Scala limitation so you can't pass an empty list and have no aggregation at all?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文