在Pyspark中的两个其他列的函数中添加列

发布于 2025-02-11 04:46:10 字数 938 浏览 2 评论 0原文

我在pyspark中的数据框架中有两个列df

| features |  center  |
+----------+----------+
| [0,1,0]  | [1.5,2,1]|
| [5,7,6]  | [10,7,7] |

我想创建一个函数,该函数在df ['features'] and df [ 'Center'] 并将其映射到DF,decort中的新列。

假设我们的功能看起来如下:

@udf
def dist(feat, cent):
    return np.linalg.norm(feat-cent)

我将如何实际应用它来做我想做的事?我正在尝试类似的事情

df.withColumn("distance", dist(col("features"), col("center"))).show()

,但这给了我以下错误:

rg.apache.spark.sparkexception:由于阶段失败而流产的工作:阶段869.0中的任务0失败了4次,最新失败:丢失任务0.3阶段869.0(TID 26423)(10.50.91.134 executor 35):Net 35):NET:NET:NET .razorvine.pickle.pickleexception:classdict构建的预期零论点(对于numpy.dtype)

我真的很努力地了解如何在火花环境中进行基本的Python映射,因此我非常感谢任何帮助。

I have two columns in a data frame df in PySpark:

| features |  center  |
+----------+----------+
| [0,1,0]  | [1.5,2,1]|
| [5,7,6]  | [10,7,7] |

I want to create a function which calculates the Euclidean distance between df['features'] and df['center'] and map it to a new column in df, distance.

Let's say our function looks like the following:

@udf
def dist(feat, cent):
    return np.linalg.norm(feat-cent)

How would I actually apply this to do what I want it to do? I was trying things like

df.withColumn("distance", dist(col("features"), col("center"))).show()

but that gives me the following error:

rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 869.0 failed 4 times, most recent failure: Lost task 0.3 in stage 869.0 (TID 26423) (10.50.91.134 executor 35): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)

I am really struggling with understanding how to do basic Python mappings in a Spark context, so I really appreciate any help.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

疯了 2025-02-18 04:46:10

您确实选择了一个困难的话题。在火花中,没有Python UDF可以完成95%以上的事情。您应该始终尝试找到一种不创建UDF的方法。

我尝试过您的UDF,我遇到了同样的错误,我真的不知道为什么。我认为数据类型有一些东西,当您将Spark Array传递到一个期望Numpy数据类型的函数中。我真的无法说出更多...

对于欧几里得距离,可以在火花中计算出来。不过,这并不容易。

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [([0, 1, 0], [1.5, 2., 1.]),
     ([5, 7, 6], [10., 7., 7.])],
    ['features', 'center'])

distance = F.aggregate(
    F.transform(
        F.arrays_zip('features', 'center'),
        lambda x: (x['features'] - x['center'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)
df = df.withColumn('distance', distance)

df.show()
# +---------+----------------+------------------+
# | features|          center|          distance|
# +---------+----------------+------------------+
# |[0, 1, 0]| [1.5, 2.0, 1.0]|2.0615528128088303|
# |[5, 7, 6]|[10.0, 7.0, 7.0]|5.0990195135927845|
# +---------+----------------+------------------+

You have truly chosen a difficult topic. In Spark, 95%+ of things can be done without python UDFs. You should always try to find a way not to create a UDF.

I've attempted your UDF, I got the same error, and I cannot really tell, why. I think there's something with data types, as you pass Spark array into a function which expects numpy data types. I really can't tell much more...

For Euclidian distance, it's possible to calculate it in Spark. Not an easy one, though.

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [([0, 1, 0], [1.5, 2., 1.]),
     ([5, 7, 6], [10., 7., 7.])],
    ['features', 'center'])

distance = F.aggregate(
    F.transform(
        F.arrays_zip('features', 'center'),
        lambda x: (x['features'] - x['center'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)
df = df.withColumn('distance', distance)

df.show()
# +---------+----------------+------------------+
# | features|          center|          distance|
# +---------+----------------+------------------+
# |[0, 1, 0]| [1.5, 2.0, 1.0]|2.0615528128088303|
# |[5, 7, 6]|[10.0, 7.0, 7.0]|5.0990195135927845|
# +---------+----------------+------------------+
嘿看小鸭子会跑 2025-02-18 04:46:10
from sklearn.metrics.pairwise import paired_distances

更改DFS模式以适应Dist柱

sch= df.withColumn('dist', lit(90.087654623)).schema

创建pandas UDF,以隔离距离

def euclidean_dist(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for pdf in iterator:
           
      yield pdf.assign(dist=paired_distances(pdf['features'].to_list(),pdf['center'].to_list()))

df.mapInPandas(euclidean_dist, schema=sch).show()

解决方案

+---------+----------------+------------------+
| features|          center|              dist|
+---------+----------------+------------------+
|[0, 1, 0]| [1.5, 2.0, 1.0]|2.0615528128088303|
|[5, 7, 6]|[10.0, 7.0, 7.0]|5.0990195135927845|
+---------+----------------+------------------+
from sklearn.metrics.pairwise import paired_distances

Alter dfs schema to accommodate the dist column

sch= df.withColumn('dist', lit(90.087654623)).schema

Create pandas udf that claculates distance

def euclidean_dist(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for pdf in iterator:
           
      yield pdf.assign(dist=paired_distances(pdf['features'].to_list(),pdf['center'].to_list()))

df.mapInPandas(euclidean_dist, schema=sch).show()

Solution

+---------+----------------+------------------+
| features|          center|              dist|
+---------+----------------+------------------+
|[0, 1, 0]| [1.5, 2.0, 1.0]|2.0615528128088303|
|[5, 7, 6]|[10.0, 7.0, 7.0]|5.0990195135927845|
+---------+----------------+------------------+
挽清梦 2025-02-18 04:46:10

您可以仅使用Pyspark和Spark SQL API来计算距离:

import pyspark.sql.functions as f
df = (
    df
    .withColumn('distance', f.sqrt(f.expr('aggregate(transform(features, (element, idx) -> pow(element - element_at(center, idx + 1), 2)), cast(0 as double), (acc, val) -> acc + val)')))
)

You can calculate the distance using only PySpark and spark sql APIs:

import pyspark.sql.functions as f
df = (
    df
    .withColumn('distance', f.sqrt(f.expr('aggregate(transform(features, (element, idx) -> pow(element - element_at(center, idx + 1), 2)), cast(0 as double), (acc, val) -> acc + val)')))
)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文