使用PANDAS在Spark API上获得数据框时的PYSPARK错误

发布于 2025-02-08 05:48:17 字数 7255 浏览 3 评论 0原文

我的文件夹结构当前是这样
| - logger
| --- __init__.py
| --- logger.py
| - 另一_package
| --- __init__.py
| --- module1.py
| - 模型
| --- Model1
| ------- main.py
| ------- model1_utilities.py

spark上下文和会话是在main.py中启动的。 main.py调用model1_utilities.py这样:

results = function_a(params)
logger.info('Calculation completed')
num_rows = results.shape[0]

我也有一个log1_utilities.py中的日志语句 logger.info(“完成的功能调用”) 就在返回语句之前

,这是我的错误:

INFO:main:Completed function call                               
INFO:main:Calculation completed
22/06/16 15:04:36 ERROR Executor: Exception in task 0.0 in stage 451.0 (TID 908)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
    import logger
ModuleNotFoundError: No module named 'logger'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.agg_doAggregateWithoutKey_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
22/06/16 15:04:36 WARN TaskSetManager: Lost task 0.0 in stage 451.0 (TID 908) (10.0.0.162 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
    import logger
ModuleNotFoundError: No module named 'logger'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.agg_doAggregateWithoutKey_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

22/06/16 15:04:36 ERROR TaskSetManager: Task 0 in stage 451.0 failed 1 times; aborting job
ERROR:main:Program did not finish successfully
Traceback (most recent call last):
  File "/Users/user1/Desktop/code/models/model1/main.py, line 174, in <module>
    num_alerts = results.shape[0]
  File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 7445, in shape
    return len(self), len(self.columns)
  File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 11909, in __len__
    return self._internal.resolved_copy.spark_frame.count()
  File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 680, in count
    return int(self._jdf.count())
  File "/Users/user1/Desktop/work/tools/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
    import logger
ModuleNotFoundError: No module named 'logger'

logger是在model1_utilities.py文件中导入的。为了访问软件包,我在main.py中添加了通往软件包的路径。我还没有在model1_utilities.py(不确定是否有必要)中完成此操作。但是我不明白为什么这是问题。我搜索以查看在Pyspark作业中使用模块和软件包是否存在问题,但找不到任何东西。任何线索都会有所帮助!谢谢!

My folder structure is currently this
|- logger
|--- __init__.py
|--- logger.py
|- another_package
|--- __init__.py
|--- module1.py
|- models
|--- model1
|------ main.py
|------ model1_utilities.py

The spark context and session are started in main.py. main.py calls model1_utilities.py like this:

results = function_a(params)
logger.info('Calculation completed')
num_rows = results.shape[0]

I have a log statement within the model1_utilities.py as well which says
logger.info("Completed function call")
just before the return statement

Now this is my error:

INFO:main:Completed function call                               
INFO:main:Calculation completed
22/06/16 15:04:36 ERROR Executor: Exception in task 0.0 in stage 451.0 (TID 908)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
    import logger
ModuleNotFoundError: No module named 'logger'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:101)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:50)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.agg_doAggregateWithoutKey_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
22/06/16 15:04:36 WARN TaskSetManager: Lost task 0.0 in stage 451.0 (TID 908) (10.0.0.162 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
    import logger
ModuleNotFoundError: No module named 'logger'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:101)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:50)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.agg_doAggregateWithoutKey_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

22/06/16 15:04:36 ERROR TaskSetManager: Task 0 in stage 451.0 failed 1 times; aborting job
ERROR:main:Program did not finish successfully
Traceback (most recent call last):
  File "/Users/user1/Desktop/code/models/model1/main.py, line 174, in <module>
    num_alerts = results.shape[0]
  File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 7445, in shape
    return len(self), len(self.columns)
  File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 11909, in __len__
    return self._internal.resolved_copy.spark_frame.count()
  File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 680, in count
    return int(self._jdf.count())
  File "/Users/user1/Desktop/work/tools/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
    import logger
ModuleNotFoundError: No module named 'logger'

Logger is imported in the model1_utilities.py file. In order to access the packages I added the path to the packages in the main.py. I haven't done it in the model1_utilities.py (not sure if that's necessary). But I don't understand why this is the issue. I searched to see if there are problems with using modules and packages in pyspark jobs but couldn't find anything. Any leads will help! Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

半枫 2025-02-15 05:48:17

原来,这不是Pyspark错误。出现错误是因为我没有在model1_utilities.py中添加软件包的路径。添加它修复了错误。不知道Python需要在希望使用该软件包的每个文件中添加包装路径。

Turns out this was not a pyspark error. The error appeared because I was not adding the path to packages in model1_utilities.py. Adding it fixed the error. Wasn't aware that python required the package path to be added in every file that wishes to use the package.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文