在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉,怎么解决?
我的Spark Streaming代码如下所示:
val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2)
val words = lines.filter(examtep(_))
words.foreachRDD(exam(_))
//some other code
def exam(rdd:RDD[SparkFlumeEvent]):Unit={
if(rdd.count()>0) {
println("****Something*****")
val newrdd=rdd.map(sfe=>{
val tmp=new String(sfe.event.getBody.array())
tmp
})
newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest")
}
}
当words.foreachRDD(exam(_))中每次执行exam()方法的时候,都会执行newrdd.saveAsTextFile("/user/''''''"),但是HDFS上Temperaturetest文件夹里的内容每次都会被覆盖掉,只保存着最后一次saveAsTextFIle的内容,怎样才能让所有数据都存储到Temperaturetest中呢??
PS:我的Spark版本为1.2.1
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
朋友你好 请问这个问题你解决了没? 我也遇到这样的问题了 能否指点一二
转为data_frame,然后用sql语句执行想要的操作。
类似于如下:
我也遇到了相同的问题。foreachRDD默认保存的文件名就是part0000_ ... part0000n,每一个rdd都是这样。所以在同一路径下后面的文件可能会覆盖前面的。我是在文件夹后面再加上时间戳,来避免覆盖。不知道还有没有更好的方法
newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest"+System.currentTimeMillis())
可以 使用hdfs的append方法去实现