使用PANDAS在Spark API上获得数据框时的PYSPARK错误
我的文件夹结构当前是这样
| - logger
| --- __init__.py
| --- logger.py
| - 另一_package
| --- __init__.py
| --- module1.py
| - 模型
| --- Model1
| ------- main.py
| ------- model1_utilities.py
spark上下文和会话是在main.py中启动的。 main.py调用model1_utilities.py这样:
results = function_a(params)
logger.info('Calculation completed')
num_rows = results.shape[0]
我也有一个log1_utilities.py中的日志语句 logger.info(“完成的功能调用”)
就在返回语句之前
,这是我的错误:
INFO:main:Completed function call
INFO:main:Calculation completed
22/06/16 15:04:36 ERROR Executor: Exception in task 0.0 in stage 451.0 (TID 908)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
import logger
ModuleNotFoundError: No module named 'logger'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
22/06/16 15:04:36 WARN TaskSetManager: Lost task 0.0 in stage 451.0 (TID 908) (10.0.0.162 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
import logger
ModuleNotFoundError: No module named 'logger'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
22/06/16 15:04:36 ERROR TaskSetManager: Task 0 in stage 451.0 failed 1 times; aborting job
ERROR:main:Program did not finish successfully
Traceback (most recent call last):
File "/Users/user1/Desktop/code/models/model1/main.py, line 174, in <module>
num_alerts = results.shape[0]
File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 7445, in shape
return len(self), len(self.columns)
File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 11909, in __len__
return self._internal.resolved_copy.spark_frame.count()
File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 680, in count
return int(self._jdf.count())
File "/Users/user1/Desktop/work/tools/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
import logger
ModuleNotFoundError: No module named 'logger'
logger是在model1_utilities.py文件中导入的。为了访问软件包,我在main.py中添加了通往软件包的路径。我还没有在model1_utilities.py(不确定是否有必要)中完成此操作。但是我不明白为什么这是问题。我搜索以查看在Pyspark作业中使用模块和软件包是否存在问题,但找不到任何东西。任何线索都会有所帮助!谢谢!
My folder structure is currently this
|- logger
|--- __init__.py
|--- logger.py
|- another_package
|--- __init__.py
|--- module1.py
|- models
|--- model1
|------ main.py
|------ model1_utilities.py
The spark context and session are started in main.py. main.py calls model1_utilities.py like this:
results = function_a(params)
logger.info('Calculation completed')
num_rows = results.shape[0]
I have a log statement within the model1_utilities.py as well which sayslogger.info("Completed function call")
just before the return statement
Now this is my error:
INFO:main:Completed function call
INFO:main:Calculation completed
22/06/16 15:04:36 ERROR Executor: Exception in task 0.0 in stage 451.0 (TID 908)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
import logger
ModuleNotFoundError: No module named 'logger'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:101)
at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:50)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
22/06/16 15:04:36 WARN TaskSetManager: Lost task 0.0 in stage 451.0 (TID 908) (10.0.0.162 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
import logger
ModuleNotFoundError: No module named 'logger'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:101)
at org.apache.spark.sql.execution.python.PythonArrowOutput$anon$1.read(PythonArrowOutput.scala:50)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage87.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
22/06/16 15:04:36 ERROR TaskSetManager: Task 0 in stage 451.0 failed 1 times; aborting job
ERROR:main:Program did not finish successfully
Traceback (most recent call last):
File "/Users/user1/Desktop/code/models/model1/main.py, line 174, in <module>
num_alerts = results.shape[0]
File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 7445, in shape
return len(self), len(self.columns)
File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 11909, in __len__
return self._internal.resolved_copy.spark_frame.count()
File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 680, in count
return int(self._jdf.count())
File "/Users/user1/Desktop/work/tools/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/Users/user1/opt/anaconda3/envs/work/lib/python3.9/site-packages/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/Users/user1/Desktop/code/models/model1/model1_utilities.py", line 3, in <module>
import logger
ModuleNotFoundError: No module named 'logger'
Logger is imported in the model1_utilities.py file. In order to access the packages I added the path to the packages in the main.py. I haven't done it in the model1_utilities.py (not sure if that's necessary). But I don't understand why this is the issue. I searched to see if there are problems with using modules and packages in pyspark jobs but couldn't find anything. Any leads will help! Thanks!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
原来,这不是Pyspark错误。出现错误是因为我没有在model1_utilities.py中添加软件包的路径。添加它修复了错误。不知道Python需要在希望使用该软件包的每个文件中添加包装路径。
Turns out this was not a pyspark error. The error appeared because I was not adding the path to packages in model1_utilities.py. Adding it fixed the error. Wasn't aware that python required the package path to be added in every file that wishes to use the package.