使用idea打包spark jar后运行出错

发布于 2021-11-25 13:55:46 字数 3460 浏览 807 评论 1

在idea上使用scala插件,创建scala-maven工程成功运行,打jar包出错

代码:


package com.chanct.idap.ssm.spark


import com.chanct.idap.ssm.common.{HbaseUtils, PlatformConfig}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
  * Created by lichao on 16-4-4.
  *
  */
object KafKa2SparkStreaming2Hbase {
   def main(args: Array[String]) {
     val zkQuorum = "c1d8:2181,c1d9:2181,c1d10:2181"
     val group = "1"
     val topics = "scala_api_topic"
     val numThreads = 2


//     val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").setMaster("local[2]").set("spark.eventLog.overwrite","true")
     val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").set("spark.eventLog.overwrite","true")
     val ssc = new StreamingContext(sparkConf, Seconds(5))
     ssc.checkpoint("checkpoint")


     val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
     val words = lines.flatMap(_.split(" "))




     val pairs = words.map(word => (word, 1))


     val wordCounts = pairs.reduceByKey(_ + _)






     wordCounts.print()


     wordCounts.foreachRDD {
       rdd => rdd.foreachPartition {


         partition =>
           val conf = PlatformConfig.loadHbaseConf() //加载hbase配置文件
           val conn = HbaseUtils.getConnection(conf) //获取hbase连接


           val userTable = TableName.valueOf("WCTest")
           val table = conn.getTable(userTable)
           partition.foreach {
             w =>
               try {
                 val put = new Put(Bytes.toBytes(System.currentTimeMillis().toString))
                 put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("value"), Bytes.toBytes(w._1.toString))
                 put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("count"), Bytes.toBytes(w._2.toString))
                 table.put(put)


               } catch {
                 case _: Exception => println("raw error!")
               }
           }
           table.close()
           conn.close()


       }
     }
     wordCounts.print()


     ssc.start()
     ssc.awaitTermination()
   }
 }
              

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

只为守护你 2021-11-26 06:16:29

问题解决了,spark与hbase衔接由于版本的问题,会有jar包找不到的问题,在spark配置中加入spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar

在spark提交任务的时候:spark-submit --master yarn-cluster --driver-class-path /etc/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar ....

http://community.cloudera.com/t5/Apache-Hadoop-Concepts-and/Class-not-found-running-Spark-sample-hbase-importformat-py-in/td-p/27084

终于搞完了,花了一天时间还是很有收获的

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文