使用类方法 pyspark 的 udf
我的问题:如何使用 pyspark udf 在类中的另一个函数中调用函数。 我尝试使用文件 devAM_hive.py 中名为 Anomalie 的类中的方法编写 pyspark udf
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import re
class Anomalie():
def __init__(self):
self.Anomalie_udf = F.udf(Anomalie.aux,ArrayType(StringType()))
def aux(texte):
code_utilisateur=re.findall(r'[\s]*\d{2}.\d{2}.\d{4}[\s]*\d{2}.\d{2}.\d{2}\s(\w?\.?\s?.*)\s\(', texte)
return code_utilisateur
def auto_test(self,df):
df=df.withColumn("name",self.Anomalie_udf(F.col("Description")))
return df
当我从主文件中调用它时, 。我收到一个名为“没有名为 'devAM_hive' 的模块”的错误。但是我定义该类的模块已导入。
from devAM_hive import *
A=Anomalie()
df=A.auto_test(row_data)
df.select("name").show(50)
错误信息:
22/04/09 14:30:58 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 588, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'devAM_hive'
My problem: How can i call a function inside another function in a class using pyspark udf.
I am trying to write a pyspark udf using a method from a class called Anomalie in the file devAM_hive.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import re
class Anomalie():
def __init__(self):
self.Anomalie_udf = F.udf(Anomalie.aux,ArrayType(StringType()))
def aux(texte):
code_utilisateur=re.findall(r'[\s]*\d{2}.\d{2}.\d{4}[\s]*\d{2}.\d{2}.\d{2}\s(\w?\.?\s?.*)\s\(', texte)
return code_utilisateur
def auto_test(self,df):
df=df.withColumn("name",self.Anomalie_udf(F.col("Description")))
return df
When i call this from the main file. I am getting an error named " No module named 'devAM_hive'".But my module in which I defined the class is imported.
from devAM_hive import *
A=Anomalie()
df=A.auto_test(row_data)
df.select("name").show(50)
The error message:
22/04/09 14:30:58 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 588, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'devAM_hive'
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
导入之所以有效,是因为您是从可用的驱动程序(位于主文件旁边)导入它。但是跑步是行不通的,因为你的执行者没有它。所以你想做的是分发 使用
--py-files
的类。通过这样做,该类将位于执行程序的类路径中。Importing works because you were importing it from the driver where it's available (sitting next to your main file). But running won't work because your executors don't have it. So what you wanted to do is distributing that class using
--py-files
. By doing that, the class will be in executor's classpath.