在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉,怎么解决?

发布于 2022-09-01 21:40:35 字数 741 浏览 16 评论 0

我的Spark Streaming代码如下所示:

val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2)

val words = lines.filter(examtep(_))
words.foreachRDD(exam(_))

//some other code

 def exam(rdd:RDD[SparkFlumeEvent]):Unit={
    if(rdd.count()>0) {
      println("****Something*****")
      val newrdd=rdd.map(sfe=>{
      val tmp=new String(sfe.event.getBody.array())
      tmp
      })
    newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest")
    }
}

当words.foreachRDD(exam(_))中每次执行exam()方法的时候,都会执行newrdd.saveAsTextFile("/user/''''''"),但是HDFS上Temperaturetest文件夹里的内容每次都会被覆盖掉,只保存着最后一次saveAsTextFIle的内容,怎样才能让所有数据都存储到Temperaturetest中呢??

PS:我的Spark版本为1.2.1

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

西瓜 2022-09-08 21:40:35

朋友你好 请问这个问题你解决了没? 我也遇到这样的问题了 能否指点一二

一生独一 2022-09-08 21:40:35

转为data_frame,然后用sql语句执行想要的操作。
类似于如下:

    def process(time,rdd):
        try:            
            hqlContext = getHqlContextInstance(rdd.context)
            
            hqlContext.sql("use shy")
            userframe =hqlContext.createDataFrame(rdd,['create_time','userid','amount'])
            userframe.registerTempTable("userframe")
            hqlContext.sql("insert overwrite table trade_order_temp select * from userframe")
            
            
            print "************"
            
        except Exception,e:
            print e

    rowrdd.foreachRDD(process)
秉烛思 2022-09-08 21:40:35

我也遇到了相同的问题。foreachRDD默认保存的文件名就是part0000_ ... part0000n,每一个rdd都是这样。所以在同一路径下后面的文件可能会覆盖前面的。我是在文件夹后面再加上时间戳,来避免覆盖。不知道还有没有更好的方法
newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest"+System.currentTimeMillis())

春庭雪 2022-09-08 21:40:35

可以 使用hdfs的append方法去实现

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文