使用类方法 pyspark 的 udf

发布于 2025-01-20 03:20:35 字数 2025 浏览 2 评论 0原文

我的问题:如何使用 pyspark udf 在类中的另一个函数中调用函数。 我尝试使用文件 devAM_hive.py 中名为 Anomalie 的类中的方法编写 pyspark udf

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import re
class Anomalie():
    def __init__(self):
        self.Anomalie_udf = F.udf(Anomalie.aux,ArrayType(StringType()))
    def aux(texte):
        code_utilisateur=re.findall(r'[\s]*\d{2}.\d{2}.\d{4}[\s]*\d{2}.\d{2}.\d{2}\s(\w?\.?\s?.*)\s\(', texte)
        return code_utilisateur
    def auto_test(self,df):
        df=df.withColumn("name",self.Anomalie_udf(F.col("Description")))
        return df

当我从主文件中调用它时, 。我收到一个名为“没有名为 'devAM_hive' 的模块”的错误。但是我定义该类的模块已导入。

from devAM_hive import *
A=Anomalie()
df=A.auto_test(row_data)
df.select("name").show(50)

错误信息:

22/04/09 14:30:58 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 588, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'devAM_hive'

My problem: How can i call a function inside another function in a class using pyspark udf.
I am trying to write a pyspark udf using a method from a class called Anomalie in the file devAM_hive.py

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import re
class Anomalie():
    def __init__(self):
        self.Anomalie_udf = F.udf(Anomalie.aux,ArrayType(StringType()))
    def aux(texte):
        code_utilisateur=re.findall(r'[\s]*\d{2}.\d{2}.\d{4}[\s]*\d{2}.\d{2}.\d{2}\s(\w?\.?\s?.*)\s\(', texte)
        return code_utilisateur
    def auto_test(self,df):
        df=df.withColumn("name",self.Anomalie_udf(F.col("Description")))
        return df

When i call this from the main file. I am getting an error named " No module named 'devAM_hive'".But my module in which I defined the class is imported.

from devAM_hive import *
A=Anomalie()
df=A.auto_test(row_data)
df.select("name").show(50)

The error message:

22/04/09 14:30:58 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 588, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/opt/mapr/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'devAM_hive'

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

一百个冬季 2025-01-27 03:20:35

当我从主文件中调用它时。我收到名为“没有名为‘devAM_hive’的模块”的错误。但是我在其中定义该类的模块已导入。

导入之所以有效,是因为您是从可用的驱动程序(位于主文件旁边)导入它。但是跑步是行不通的,因为你的执行者没有它。所以你想做的是分发 使用 --py-files 的类。通过这样做,该类将位于执行程序的类路径中。

spark = (SparkSession
    .builder
    .appName('Test App')
    .config('spark.submit.pyFiles', '/path/to/devAM_hive.py')
    .getOrCreate()
)

When i call this from the main file. I am getting an error named " No module named 'devAM_hive'". But my module in which I defined the class is imported.

Importing works because you were importing it from the driver where it's available (sitting next to your main file). But running won't work because your executors don't have it. So what you wanted to do is distributing that class using --py-files. By doing that, the class will be in executor's classpath.

spark = (SparkSession
    .builder
    .appName('Test App')
    .config('spark.submit.pyFiles', '/path/to/devAM_hive.py')
    .getOrCreate()
)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文