Spark: 用Python运行机器学习例子时出现错误
在下spark新手,最近一直在学习,用pyspark跑了一些例子,都没有问题,但是运行ml例子中的random_forest_example.py的时候却出现如下错误:
- py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
- org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
at java.io.BufferedOutputStream.flush(Unknown Source)
at java.io.DataOutputStream.flush(Unknown Source)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:251)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
运行的代码如下:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.evaluation import MulticlassMetrics, RegressionMetrics
from pyspark.mllib.util import MLUtils
from pyspark.sql import Row, SQLContext
"""
A simple example demonstrating a RandomForest Classification/Regression Pipeline.
Run with:
bin/spark-submit examples/src/main/python/ml/random_forest_example.py
"""
def testClassification(train, test):
# Train a RandomForest model.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
# Note: Use larger numTrees in practice.
rf = RandomForestClassifier(labelCol="indexedLabel", numTrees=3, maxDepth=4)
model = rf.fit(train)
predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
.map(lambda x: (x.prediction, x.indexedLabel))
metrics = MulticlassMetrics(predictionAndLabels)
print("weighted f-measure %.3f" % metrics.weightedFMeasure())
print("precision %s" % metrics.precision())
print("recall %s" % metrics.recall())
def testRegression(train, test):
# Train a RandomForest model.
# Note: Use larger numTrees in practice.
rf = RandomForestRegressor(labelCol="indexedLabel", numTrees=3, maxDepth=4)
model = rf.fit(train)
predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
.map(lambda x: (x.prediction, x.indexedLabel))
metrics = RegressionMetrics(predictionAndLabels)
print("rmse %.3f" % metrics.rootMeanSquaredError)
print("r2 %.3f" % metrics.r2)
print("mae %.3f" % metrics.meanAbsoluteError)
if __name__ == "__main__":
if len(sys.argv) > 1:
print("Usage: random_forest_example", file=sys.stderr)
exit(1)
sc = SparkContext(appName="PythonRandomForestExample")
sqlContext = SQLContext(sc)
# Load and parse the data file into a dataframe.
df = MLUtils.loadLibSVMFile(sc, "D:\spark-1.4.0\examples\src\main\python\ml\sample_libsvm_data.txt").toDF()
# Map labels into an indexed column of labels in [0, numLabels)
stringIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
[train, test] = td.randomSplit([0.7, 0.3])
testClassification(train, test)
testRegression(train, test)
sc.stop()
逐行运行发现问题出在
# Load and parse the data file into a dataframe.
df = MLUtils.loadLibSVMFile(sc, "D:\spark-1.4.0\examples\src\main\python\ml\sample_libsvm_data.txt").toDF()
中的toDF()方法上,loadLibSVMFile没有问题,得到的是RDD,而且可以collect得到数组,只是toDF()这一步出现了问题。在下新手不知道问题出在哪里,还请各位帮忙。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我也遇到这样的问题 你解决了么!??
可以更新一下sbt