Spark: 用Python运行机器学习例子时出现错误

发布于 2022-09-01 15:27:37 字数 5237 浏览 17 评论 0

在下spark新手,最近一直在学习,用pyspark跑了一些例子,都没有问题,但是运行ml例子中的random_forest_example.py的时候却出现如下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
at java.io.BufferedOutputStream.flush(Unknown Source)
at java.io.DataOutputStream.flush(Unknown Source)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:251)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

运行的代码如下:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.evaluation import MulticlassMetrics, RegressionMetrics
from pyspark.mllib.util import MLUtils
from pyspark.sql import Row, SQLContext

"""
A simple example demonstrating a RandomForest Classification/Regression Pipeline.
Run with:
  bin/spark-submit examples/src/main/python/ml/random_forest_example.py
"""


def testClassification(train, test):
    # Train a RandomForest model.
    # Setting featureSubsetStrategy="auto" lets the algorithm choose.
    # Note: Use larger numTrees in practice.

    rf = RandomForestClassifier(labelCol="indexedLabel", numTrees=3, maxDepth=4)

    model = rf.fit(train)
    predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
        .map(lambda x: (x.prediction, x.indexedLabel))

    metrics = MulticlassMetrics(predictionAndLabels)
    print("weighted f-measure %.3f" % metrics.weightedFMeasure())
    print("precision %s" % metrics.precision())
    print("recall %s" % metrics.recall())


def testRegression(train, test):
    # Train a RandomForest model.
    # Note: Use larger numTrees in practice.

    rf = RandomForestRegressor(labelCol="indexedLabel", numTrees=3, maxDepth=4)

    model = rf.fit(train)
    predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
        .map(lambda x: (x.prediction, x.indexedLabel))

    metrics = RegressionMetrics(predictionAndLabels)
    print("rmse %.3f" % metrics.rootMeanSquaredError)
    print("r2 %.3f" % metrics.r2)
    print("mae %.3f" % metrics.meanAbsoluteError)


if __name__ == "__main__":
    if len(sys.argv) > 1:
        print("Usage: random_forest_example", file=sys.stderr)
        exit(1)
    sc = SparkContext(appName="PythonRandomForestExample")
    sqlContext = SQLContext(sc)

    # Load and parse the data file into a dataframe.
    df = MLUtils.loadLibSVMFile(sc, "D:\spark-1.4.0\examples\src\main\python\ml\sample_libsvm_data.txt").toDF()

    # Map labels into an indexed column of labels in [0, numLabels)
    stringIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
    si_model = stringIndexer.fit(df)
    td = si_model.transform(df)
    [train, test] = td.randomSplit([0.7, 0.3])
    testClassification(train, test)
    testRegression(train, test)
    sc.stop()

逐行运行发现问题出在

# Load and parse the data file into a dataframe.
df = MLUtils.loadLibSVMFile(sc, "D:\spark-1.4.0\examples\src\main\python\ml\sample_libsvm_data.txt").toDF()

中的toDF()方法上,loadLibSVMFile没有问题,得到的是RDD,而且可以collect得到数组,只是toDF()这一步出现了问题。在下新手不知道问题出在哪里,还请各位帮忙。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

命比纸薄 2022-09-08 15:27:37

我也遇到这样的问题 你解决了么!??

埋葬我深情 2022-09-08 15:27:37

可以更新一下sbt

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文