PY4JNETWORKERROR:试图连接到Java服务器时发生了错误(127.0.0.1:32778)
我有这样的pyspark数据框架:
x y
656 78
766 87
677 63
. .
. .
. .
我的行约为7200万行。现在,我想为列y
绘制此Pyspark DF的直方图。 我尝试过collect()
和topandas()
。 但是收集
方法正在抛出错误。
[val.y for val in df.select('y').collect()]
Out:
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:32778)
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 535, in collect
sock_info = self._jdf.collectToPython()
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 336, in get_return_value
format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o180.collectToPython
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/dataframe.py in collect(self)
534 with SCCallSiteSync(self._sc) as css:
--> 535 sock_info = self._jdf.collectToPython()
536 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
335 "An error occurred while calling {0}{1}{2}".
--> 336 format(target_id, ".", name))
337 else:
Py4JError: An error occurred while calling o180.collectToPython
During handling of the above exception, another exception occurred:
IndexError Traceback (most recent call last)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _get_connection(self)
928 try:
--> 929 connection = self.deque.pop()
930 except IndexError:
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
ConnectionRefusedError Traceback (most recent call last)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in start(self)
1066 try:
-> 1067 self.socket.connect((self.address, self.port))
1068 self.stream = self.socket.makefile("rb")
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Py4JNetworkError Traceback (most recent call last)
/tmp/ipykernel_29360/1990936990.py in <module>
----> 1 non_routine_saving = [val.savingsPercent for val in non_routine.select('savingsPercent').collect()]
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/dataframe.py in collect(self)
533 """
534 with SCCallSiteSync(self._sc) as css:
--> 535 sock_info = self._jdf.collectToPython()
536 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
537
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/traceback_utils.py in __exit__(self, type, value, tb)
76 SCCallSiteSync._spark_stack_depth -= 1
77 if SCCallSiteSync._spark_stack_depth == 0:
---> 78 self._context._jsc.setCallSite(None)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1253 proto.END_COMMAND_PART
1254
-> 1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
1257 answer, self.gateway_client, self.target_id, self.name)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
981 if `binary` is `True`.
982 """
--> 983 connection = self._get_connection()
984 try:
985 response = connection.send_command(command)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _get_connection(self)
929 connection = self.deque.pop()
930 except IndexError:
--> 931 connection = self._create_connection()
932 return connection
933
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _create_connection(self)
935 connection = GatewayConnection(
936 self.gateway_parameters, self.gateway_property)
--> 937 connection.start()
938 return connection
939
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in start(self)
1077 "server ({0}:{1})".format(self.address, self.port)
1078 logger.exception(msg)
-> 1079 raise Py4JNetworkError(msg, e)
1080
1081 def _authenticate_connection(self):
Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:32778)
火花配置:
from __future__ import print_function
from platform import python_version
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import json
import csv
import pickle as pkl
import seaborn as sns
import plotly.express as px
from datetime import date, datetime, timedelta
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.types as T
from pyspark.sql import functions as f
from pyspark.sql.functions import col, countDistinct
from pyspark.sql.window import Window
pd.set_option('display.max_columns', None)
# Constants for application
APPLICATION_NAME = "p13n_data_introduction"
CHECKPOINT_DIRECTORY = "gs://p13n-storage2/user/s1b0jec"
spark_config = {}
spark_config["spark.executor.memory"] = "32G"
# spark_config["spark.executor.memoryOverhead"] = "4G"
spark_config["spark.executor.cores"] = "32"
spark_config["spark.driver.memory"] = "32G"
# spark_config["spark.shuffle.memoryFraction"] = "0"
# Executor config
spark_config["spark.dyamicAllocation.enable"] = "true"
spark_config["spark.dynamicAllocation.minExecutors"] = "100"
spark_config["spark.dynamicAllocation.maxExecutors"] = "300"
spark_config["spark.submit.deployMode"] = "client"
spark_config["spark.hive.mapred.supports.subdirectories"] = "true"
spark_config["spark.yarn.queue"] = "default"
spark_config["spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive"] = "true"
spark_config["spark.hadoop.hive.exec.dynamic.partition"] = "true"
spark_config["spark.hadoop.hive.exec.dynamic.partition.mode"] = "nonstrict"
spark_config["spark.hadoop.hive.exec.max.dynamic.partitions.pernode"] = "100"
spark_config["spark.yarn.dist.archives"] = "gs://p13n-storage2/user/s1b0jec/envs/spark.zip#mypython"
# spark_config["spark.yarn.appMasterEnv.PYSPARK_PYTHON"] =
# spark_config["spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON"] = "python" # For client mode it's the default `python` executable, whereas for cluster mode we use the distributed python environment
os.environ['PYSPARK_PYTHON'] = "./mypython/spark/bin/python"
os.environ['PYSPARK_DRIVER_PYTHON'] = "python"
spark_config["spark.jars"] = "/opt/lib/bfdms-ien/dp1.5/lib/apache-hive-1.3.0-SNAPSHOT-bin/hcatalog/share/hcatalog/hive-hcatalog-core-1.3.0-SNAPSHOT.jar,/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar,/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar,/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar"
spark_conf = SparkConf().setAll(spark_config.items())
spark = SparkSession.builder.appName(APPLICATION_NAME) \
.config(conf=spark_conf).enableHiveSupport().getOrCreate()
print("Spark session created: ", spark.sparkContext.applicationId)
spark.sparkContext.setCheckpointDir(CHECKPOINT_DIRECTORY)
import warnings
warnings.filterwarnings("ignore")
I have pyspark dataframe like this:
x y
656 78
766 87
677 63
. .
. .
. .
Where I have around 72 million rows. Now I want to plot histogram for this pyspark df for column y
.
I have tried collect()
and toPandas()
.
But collect
method is throwing error.
[val.y for val in df.select('y').collect()]
Out:
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:32778)
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 535, in collect
sock_info = self._jdf.collectToPython()
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 336, in get_return_value
format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o180.collectToPython
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/dataframe.py in collect(self)
534 with SCCallSiteSync(self._sc) as css:
--> 535 sock_info = self._jdf.collectToPython()
536 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
335 "An error occurred while calling {0}{1}{2}".
--> 336 format(target_id, ".", name))
337 else:
Py4JError: An error occurred while calling o180.collectToPython
During handling of the above exception, another exception occurred:
IndexError Traceback (most recent call last)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _get_connection(self)
928 try:
--> 929 connection = self.deque.pop()
930 except IndexError:
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
ConnectionRefusedError Traceback (most recent call last)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in start(self)
1066 try:
-> 1067 self.socket.connect((self.address, self.port))
1068 self.stream = self.socket.makefile("rb")
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Py4JNetworkError Traceback (most recent call last)
/tmp/ipykernel_29360/1990936990.py in <module>
----> 1 non_routine_saving = [val.savingsPercent for val in non_routine.select('savingsPercent').collect()]
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/sql/dataframe.py in collect(self)
533 """
534 with SCCallSiteSync(self._sc) as css:
--> 535 sock_info = self._jdf.collectToPython()
536 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
537
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/pyspark/traceback_utils.py in __exit__(self, type, value, tb)
76 SCCallSiteSync._spark_stack_depth -= 1
77 if SCCallSiteSync._spark_stack_depth == 0:
---> 78 self._context._jsc.setCallSite(None)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1253 proto.END_COMMAND_PART
1254
-> 1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
1257 answer, self.gateway_client, self.target_id, self.name)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
981 if `binary` is `True`.
982 """
--> 983 connection = self._get_connection()
984 try:
985 response = connection.send_command(command)
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _get_connection(self)
929 connection = self.deque.pop()
930 except IndexError:
--> 931 connection = self._create_connection()
932 return connection
933
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _create_connection(self)
935 connection = GatewayConnection(
936 self.gateway_parameters, self.gateway_property)
--> 937 connection.start()
938 return connection
939
/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in start(self)
1077 "server ({0}:{1})".format(self.address, self.port)
1078 logger.exception(msg)
-> 1079 raise Py4JNetworkError(msg, e)
1080
1081 def _authenticate_connection(self):
Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:32778)
Spark Config:
from __future__ import print_function
from platform import python_version
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import json
import csv
import pickle as pkl
import seaborn as sns
import plotly.express as px
from datetime import date, datetime, timedelta
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.types as T
from pyspark.sql import functions as f
from pyspark.sql.functions import col, countDistinct
from pyspark.sql.window import Window
pd.set_option('display.max_columns', None)
# Constants for application
APPLICATION_NAME = "p13n_data_introduction"
CHECKPOINT_DIRECTORY = "gs://p13n-storage2/user/s1b0jec"
spark_config = {}
spark_config["spark.executor.memory"] = "32G"
# spark_config["spark.executor.memoryOverhead"] = "4G"
spark_config["spark.executor.cores"] = "32"
spark_config["spark.driver.memory"] = "32G"
# spark_config["spark.shuffle.memoryFraction"] = "0"
# Executor config
spark_config["spark.dyamicAllocation.enable"] = "true"
spark_config["spark.dynamicAllocation.minExecutors"] = "100"
spark_config["spark.dynamicAllocation.maxExecutors"] = "300"
spark_config["spark.submit.deployMode"] = "client"
spark_config["spark.hive.mapred.supports.subdirectories"] = "true"
spark_config["spark.yarn.queue"] = "default"
spark_config["spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive"] = "true"
spark_config["spark.hadoop.hive.exec.dynamic.partition"] = "true"
spark_config["spark.hadoop.hive.exec.dynamic.partition.mode"] = "nonstrict"
spark_config["spark.hadoop.hive.exec.max.dynamic.partitions.pernode"] = "100"
spark_config["spark.yarn.dist.archives"] = "gs://p13n-storage2/user/s1b0jec/envs/spark.zip#mypython"
# spark_config["spark.yarn.appMasterEnv.PYSPARK_PYTHON"] =
# spark_config["spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON"] = "python" # For client mode it's the default `python` executable, whereas for cluster mode we use the distributed python environment
os.environ['PYSPARK_PYTHON'] = "./mypython/spark/bin/python"
os.environ['PYSPARK_DRIVER_PYTHON'] = "python"
spark_config["spark.jars"] = "/opt/lib/bfdms-ien/dp1.5/lib/apache-hive-1.3.0-SNAPSHOT-bin/hcatalog/share/hcatalog/hive-hcatalog-core-1.3.0-SNAPSHOT.jar,/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar,/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar,/opt/lib/bfdms-ien/dp1.5/lib/spark-2.4.8-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar"
spark_conf = SparkConf().setAll(spark_config.items())
spark = SparkSession.builder.appName(APPLICATION_NAME) \
.config(conf=spark_conf).enableHiveSupport().getOrCreate()
print("Spark session created: ", spark.sparkContext.applicationId)
spark.sparkContext.setCheckpointDir(CHECKPOINT_DIRECTORY)
import warnings
warnings.filterwarnings("ignore")
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论