pyspark SparseVectors 数据帧列 .dot 产品或使用 @udf 或 @pandas_udf 的任何其他向量类型列计算

发布于 2025-01-14 06:52:52 字数 1877 浏览 5 评论 0原文

我确实尝试计算给定数据帧的两列之间的.dot乘积, SparseVectors 已经在 Spark 中具备了这种能力,所以我尝试以简单的方式执行此功能。可扩展的方式,无需转换为RDDDenseVectors 但我被困住了,花了过去 3 天的时间尝试找出一个 方法并失败,不返回传递的 2 个向量的计算 数据框中的列并寻找有关此事的指导, 拜托,因为我在这里缺少一些东西,不确定根本原因是什么......

对于单独的向量和 rdd 向量,此方法有效,但会失败 在传递数据帧列向量时工作,复制流程 和问题请参见下文,理想情况下此计算是并行发生的,因为实际工作数据具有数十亿或更多行(数据帧观察):

from pyspark.ml.linalg import Vectors, SparseVector
    from pyspark.sql import Row
    df = spark.createDataFrame(
        [
         [["a","b","c"], SparseVector(4527, {0:0.6363067860791387, 1:1.0888040725098247, 31:4.371858972705023}),SparseVector(4527, {0:0.6363067860791387, 1:2.0888040725098247, 31:4.371858972705023})],
         [["d"], SparseVector(4527, {8: 2.729945780576634}), SparseVector(4527, {8: 4.729945780576634})],
        ], ["word", "i", "j"])

# # daframe content
df.show()
+---------+--------------------+--------------------+
|     word|                   i|                   j|
+---------+--------------------+--------------------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...|
+---------+--------------------+--------------------+


@udf(returnType=ArrayType(FloatType()))
def sim_cos(v1, v2):
    if v1 is not None and v2 is not None:
        return float(v1.dot(v2))

# # calling udf
df = df.withColumn("dotP", sim_cos(df.i, df.j))

# # output after udf 
df.show()
+---------+--------------------+--------------------+----------+
|     word|                   i|                   j|      dotP|
+---------+--------------------+--------------------+----------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...|      null|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...|      null|
+---------+--------------------+--------------------+----------+

I do try to compute .dot product between 2 columns of a give dataframe,
SparseVectors has this ability in spark already so I try to execute this in an easy & scalable way without converting to RDDs or to
DenseVectors but i'm stuck, spent past 3 days to try find out of an
approach and does fail, doesn't return computation for passed 2 vector
columns from dataframe and looking for guidance on this matter,
please, because something I'm missing here and not sure what is root cause ...

For separate vectors and rdd vectors works this approach but does fail
to work when passing dataframe column vectors, to replicate the flow
and issues please see below, ideally would be this computation to happen in parallel since real work data is with billions or more rows (dataframe observations):

from pyspark.ml.linalg import Vectors, SparseVector
    from pyspark.sql import Row
    df = spark.createDataFrame(
        [
         [["a","b","c"], SparseVector(4527, {0:0.6363067860791387, 1:1.0888040725098247, 31:4.371858972705023}),SparseVector(4527, {0:0.6363067860791387, 1:2.0888040725098247, 31:4.371858972705023})],
         [["d"], SparseVector(4527, {8: 2.729945780576634}), SparseVector(4527, {8: 4.729945780576634})],
        ], ["word", "i", "j"])

# # daframe content
df.show()
+---------+--------------------+--------------------+
|     word|                   i|                   j|
+---------+--------------------+--------------------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...|
+---------+--------------------+--------------------+


@udf(returnType=ArrayType(FloatType()))
def sim_cos(v1, v2):
    if v1 is not None and v2 is not None:
        return float(v1.dot(v2))

# # calling udf
df = df.withColumn("dotP", sim_cos(df.i, df.j))

# # output after udf 
df.show()
+---------+--------------------+--------------------+----------+
|     word|                   i|                   j|      dotP|
+---------+--------------------+--------------------+----------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...|      null|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...|      null|
+---------+--------------------+--------------------+----------+

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

小红帽 2025-01-21 06:52:52

将 udf 重写为 lambda 可以在 Spark 2.4.5 上运行。发帖以防万一
任何人都对 PySpark 数据帧的这种方法感兴趣:

# # rewrite udf as lambda function:
sim_cos = F.udf(lambda x,y : float(x.dot(y)), FloatType())

# # executing udf on dataframe
df = df.withColumn("similarity", sim_cos(col("i"),col("j")))

# # end result
df.show()

+---------+--------------------+--------------------+----------+
|     word|                   i|                   j|similarity|
+---------+--------------------+--------------------+----------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...| 21.792336|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...| 12.912496|
+---------+--------------------+--------------------+----------+

Rewriting udf as lambda does work on spark 2.4.5. Posting in case
anyone is interested in this approach for PySpark dataframes:

# # rewrite udf as lambda function:
sim_cos = F.udf(lambda x,y : float(x.dot(y)), FloatType())

# # executing udf on dataframe
df = df.withColumn("similarity", sim_cos(col("i"),col("j")))

# # end result
df.show()

+---------+--------------------+--------------------+----------+
|     word|                   i|                   j|similarity|
+---------+--------------------+--------------------+----------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...| 21.792336|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...| 12.912496|
+---------+--------------------+--------------------+----------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文