在Pyspark中的两个其他列的函数中添加列
我在pyspark中的数据框架中有两个列df
:
| features | center |
+----------+----------+
| [0,1,0] | [1.5,2,1]|
| [5,7,6] | [10,7,7] |
我想创建一个函数,该函数在df ['features']
and df [ 'Center'] 并将其映射到DF,decort
中的新列。
假设我们的功能看起来如下:
@udf
def dist(feat, cent):
return np.linalg.norm(feat-cent)
我将如何实际应用它来做我想做的事?我正在尝试类似的事情
df.withColumn("distance", dist(col("features"), col("center"))).show()
,但这给了我以下错误:
rg.apache.spark.sparkexception:由于阶段失败而流产的工作:阶段869.0中的任务0失败了4次,最新失败:丢失任务0.3阶段869.0(TID 26423)(10.50.91.134 executor 35):Net 35):NET:NET:NET .razorvine.pickle.pickleexception:classdict构建的预期零论点(对于numpy.dtype)
我真的很努力地了解如何在火花环境中进行基本的Python映射,因此我非常感谢任何帮助。
I have two columns in a data frame df
in PySpark:
| features | center |
+----------+----------+
| [0,1,0] | [1.5,2,1]|
| [5,7,6] | [10,7,7] |
I want to create a function which calculates the Euclidean distance between df['features']
and df['center']
and map it to a new column in df, distance
.
Let's say our function looks like the following:
@udf
def dist(feat, cent):
return np.linalg.norm(feat-cent)
How would I actually apply this to do what I want it to do? I was trying things like
df.withColumn("distance", dist(col("features"), col("center"))).show()
but that gives me the following error:
rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 869.0 failed 4 times, most recent failure: Lost task 0.3 in stage 869.0 (TID 26423) (10.50.91.134 executor 35): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
I am really struggling with understanding how to do basic Python mappings in a Spark context, so I really appreciate any help.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
您确实选择了一个困难的话题。在火花中,没有Python UDF可以完成95%以上的事情。您应该始终尝试找到一种不创建UDF的方法。
我尝试过您的UDF,我遇到了同样的错误,我真的不知道为什么。我认为数据类型有一些东西,当您将Spark Array传递到一个期望Numpy数据类型的函数中。我真的无法说出更多...
对于欧几里得距离,可以在火花中计算出来。不过,这并不容易。
You have truly chosen a difficult topic. In Spark, 95%+ of things can be done without python UDFs. You should always try to find a way not to create a UDF.
I've attempted your UDF, I got the same error, and I cannot really tell, why. I think there's something with data types, as you pass Spark array into a function which expects numpy data types. I really can't tell much more...
For Euclidian distance, it's possible to calculate it in Spark. Not an easy one, though.
更改DFS模式以适应Dist柱
创建pandas UDF,以隔离距离
解决方案
Alter dfs schema to accommodate the dist column
Create pandas udf that claculates distance
Solution
您可以仅使用Pyspark和Spark SQL API来计算距离:
You can calculate the distance using only PySpark and spark sql APIs: