欧几里得的距离或与矢量的圆柱之间的相似性

发布于 2025-02-03 04:12:47 字数 642 浏览 4 评论 0原文

我有以下形式的Spark数据框:

> df1
+---------------+----------------+
|        vector1|         vector2|  
+---------------+----------------+
|[[0.9,0.5,0.2]]| [[0.1,0.3,0.2]]|
|[[0.8,0.7,0.1]]| [[0.8,0.4,0.2]]|
|[[0.9,0.2,0.8]]| [[0.3,0.1,0.8]]|
+---------------+----------------+

> df1.printSchema()
root
 |-- vector1: array (nullable = true)
 |    |-- element: vector (containsNull = true)
 |-- vector2: array (nullable = true)
 |    |-- element: vector (containsNull = true)

我需要计算vector1vector2 列之间的欧几里得距离或余弦相似性。
我该如何使用Pyspark进行操作?

I have a Spark dataframe in the following form:

> df1
+---------------+----------------+
|        vector1|         vector2|  
+---------------+----------------+
|[[0.9,0.5,0.2]]| [[0.1,0.3,0.2]]|
|[[0.8,0.7,0.1]]| [[0.8,0.4,0.2]]|
|[[0.9,0.2,0.8]]| [[0.3,0.1,0.8]]|
+---------------+----------------+

> df1.printSchema()
root
 |-- vector1: array (nullable = true)
 |    |-- element: vector (containsNull = true)
 |-- vector2: array (nullable = true)
 |    |-- element: vector (containsNull = true)

I need to calculate Euclidean distance or cosine similarity between vector1 and vector2 columns.
How can I do this using PySpark?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

GRAY°灰色天空 2025-02-10 04:12:47

当列是数组类型的列:

distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)

完整测试:

from pyspark.sql import functions as F
df1 = spark.createDataFrame(
    [([0.9, 0.5, 0.2], [0.1, 0.3, 0.2]),
     ([0.8, 0.7, 0.1], [0.8, 0.4, 0.2]),
     ([0.9, 0.2, 0.8], [0.3, 0.1, 0.8])],
    ['vector1', 'vector2']
)
distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)
df2 = df1.withColumn('euclidean_distance', distance)

df2.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1        |vector2        |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822  |
# +---------------+---------------+-------------------+

如果列是向量类型的,我会首先将它们转换为数组:

df2 = df1.select(
    vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
    vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)

完整测试:

from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
df1 = spark.createDataFrame(
    [([Vectors.dense(0.9, 0.5, 0.2)], [Vectors.dense(0.1, 0.3, 0.2)]),
     ([Vectors.dense(0.8, 0.7, 0.1)], [Vectors.dense(0.8, 0.4, 0.2)]),
     ([Vectors.dense(0.9, 0.2, 0.8)], [Vectors.dense(0.3, 0.1, 0.8)])],
    ['vector1', 'vector2']
)
df2 = df1.select(
    vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
    vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)
df3 = df2.withColumn('euclidean_distance', distance)

df3.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1        |vector2        |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822  |
# +---------------+---------------+-------------------+

When columns are of array type:

distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)

Full test:

from pyspark.sql import functions as F
df1 = spark.createDataFrame(
    [([0.9, 0.5, 0.2], [0.1, 0.3, 0.2]),
     ([0.8, 0.7, 0.1], [0.8, 0.4, 0.2]),
     ([0.9, 0.2, 0.8], [0.3, 0.1, 0.8])],
    ['vector1', 'vector2']
)
distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)
df2 = df1.withColumn('euclidean_distance', distance)

df2.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1        |vector2        |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822  |
# +---------------+---------------+-------------------+

If columns are of vector type, I would first convert them to arrays:

df2 = df1.select(
    vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
    vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)

Full test:

from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
df1 = spark.createDataFrame(
    [([Vectors.dense(0.9, 0.5, 0.2)], [Vectors.dense(0.1, 0.3, 0.2)]),
     ([Vectors.dense(0.8, 0.7, 0.1)], [Vectors.dense(0.8, 0.4, 0.2)]),
     ([Vectors.dense(0.9, 0.2, 0.8)], [Vectors.dense(0.3, 0.1, 0.8)])],
    ['vector1', 'vector2']
)
df2 = df1.select(
    vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
    vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)
df3 = df2.withColumn('euclidean_distance', distance)

df3.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1        |vector2        |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822  |
# +---------------+---------------+-------------------+
凯凯我们等你回来 2025-02-10 04:12:47

让我们尝试熊猫UDF。它的矢量和更快。

DF

df=spark.createDataFrame([([[0.9,0.5,0.2]], [[0.1,0.3,0.2]]),
([[0.8,0.7,0.1]], [[0.8,0.4,0.2]]),
([[0.9,0.2,0.8]], [[0.3,0.1,0.8]])],


('vector1',         'vector2' ))



#flatten arrays.
    df1= df.select(*[flatten(col(x)).alias(x) for x in df.columns])

#create udf with new schema taking into consideration new col with distance     
    sch= df1.withColumn('v', lit(90.087654623)).schema



#udf

 from sklearn.metrics.pairwise import paired_distances
def Eucl(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
        for pdf in iterator:
               
          yield pdf.assign(v=paired_distances(pdf['vector1'].to_list(),pdf['vector2'].to_list()))
    
    df1.mapInPandas(Eucl, schema=sch).show()

结果

+---------------+---------------+-------------------+
|        vector1|        vector2|                  v|
+---------------+---------------+-------------------+
|[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]| 0.8246211251235323|
|[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
|[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|  0.608276253029822|
+---------------+---------------+-------------------+

Lets try pandas udf. its vectorised and faster.

df

df=spark.createDataFrame([([[0.9,0.5,0.2]], [[0.1,0.3,0.2]]),
([[0.8,0.7,0.1]], [[0.8,0.4,0.2]]),
([[0.9,0.2,0.8]], [[0.3,0.1,0.8]])],


('vector1',         'vector2' ))



#flatten arrays.
    df1= df.select(*[flatten(col(x)).alias(x) for x in df.columns])

#create udf with new schema taking into consideration new col with distance     
    sch= df1.withColumn('v', lit(90.087654623)).schema



#udf

 from sklearn.metrics.pairwise import paired_distances
def Eucl(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
        for pdf in iterator:
               
          yield pdf.assign(v=paired_distances(pdf['vector1'].to_list(),pdf['vector2'].to_list()))
    
    df1.mapInPandas(Eucl, schema=sch).show()

outcome

+---------------+---------------+-------------------+
|        vector1|        vector2|                  v|
+---------------+---------------+-------------------+
|[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]| 0.8246211251235323|
|[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
|[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|  0.608276253029822|
+---------------+---------------+-------------------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文