在Spark Scala聚集中扩展表达
我正在尝试将一个简单的聚合代码从Pyspark转换为Scala。
数据范围:
# PySpark
from pyspark.sql import functions as F
df = spark.createDataFrame(
[([10, 100],),
([20, 200],)],
['vals'])
// Scala
val df = Seq(
(Seq(10, 100)),
(Seq(20, 200)),
).toDF("vals")
聚合扩展 - pyspark中的正常:
df2 = df.agg(
*[F.sum(F.col("vals")[i]).alias(f"col{i}") for i in range(2)]
)
df2.show()
# +----+----+
# |col0|col1|
# +----+----+
# | 30| 300|
# +----+----+
但是在scala ...中
val df2 = df.agg(
(0 until 2).map(i => sum($"vals"(i)).alias(s"col$i")): _*
)
(0到2).map(i => sum($“ vals”(i))。别名(s“ col $ i”)): _* ^ 在第2行:错误:no`:_*`允许的注释 (此类注释仅在论证中允许 * - 参数)
语法似乎与此select
效果很好的语法几乎相同:
val df2 = df.select(
(0 until 2).map(i => $"vals"(i).alias(s"col$i")): _*
)
表达式扩展在Scala Spark聚集中是否有效?如何?
I'm trying to convert a simple aggregation code from PySpark to Scala.
The dataframes:
# PySpark
from pyspark.sql import functions as F
df = spark.createDataFrame(
[([10, 100],),
([20, 200],)],
['vals'])
// Scala
val df = Seq(
(Seq(10, 100)),
(Seq(20, 200)),
).toDF("vals")
Aggregation expansion - OK in PySpark:
df2 = df.agg(
*[F.sum(F.col("vals")[i]).alias(f"col{i}") for i in range(2)]
)
df2.show()
# +----+----+
# |col0|col1|
# +----+----+
# | 30| 300|
# +----+----+
But in Scala...
val df2 = df.agg(
(0 until 2).map(i => sum(quot;vals"(i)).alias(s"col$i")): _*
)
(0 until 2).map(i => sum(quot;vals"(i)).alias(s"col$i")): _* ^ On line 2: error: no `: _*` annotation allowed here (such annotations are only allowed in arguments to *-parameters)
The syntax seems almost the same to this select
which works well:
val df2 = df.select(
(0 until 2).map(i => quot;vals"(i).alias(s"col$i")): _*
)
Does expression expansion work in Scala Spark aggregations? How?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我不完全理解编译器为什么会发生这种情况,但似乎并没有将您的seq [column]打开vararg作为参数。
正如@RVDV在他的帖子中提到的那样,该方法的签名是
def agg(expr:column,exprs:column*):dataframe
因此,临时解决方案是您手动打开它,例如:
i'm not fully understanding why this is happening for the compiler but it seems that it is not unpacking your Seq[Column] to vararg as params.
as @RvdV has mentioned in his post, the signature of the method is
def agg(expr: Column, exprs: Column*): DataFrame
so a temp solution is you unpack it manually, like:
如果您查看
因此,您应该首先具有任何其他聚合,然后对于第二个参数,您可以进行列表扩展。因此,
列表前面的任何其他单一聚合都应该起作用。
我不知道为什么会这样,也许这是一个限制,所以您不能通过空列表,根本没有聚合?
If you look at the documentation of Dataset.agg, you see that it first has a fixed parameter and then a list of unspecified length:
So you should first have any other aggregation, then for the second argument you can do the list expansion. So something like
or any other single aggregation in front of the list should work.
I don't know why it is like this, maybe it's a Scala limitation so you can't pass an empty list and have no aggregation at all?