sparkstreaming local[4]消费Kafka四分区topic,只有一个消费者在工作
sparkstreaming消费Kafka,用local[4]模式处理有四个分区的topic,为什么只起来一个消费者,手动维护的四个分区offset更新值都一样,有碰到过这种情况…
Spark on yarn 在创建Hbase的Connection时,报 ClassNotFoundException
ConnectionFactory.createConnection(configuration) 在执行以上方法时候报的错误:Caused by: java.lang.ClassNotFoundException: org.apache.hadoop…
python开发,spark接收kafka的输入流时
import cv2 import sys import findspark findspark.init() from kafka import KafkaConsumer from pyspark import SparkContext from pyspark.strea…
序列化后的数据,怎么从Spark读出并转成DataSet或者DataFrame?
想用spark分析zipkin输出的数据,结果第一步就出现困难... =。= 在kafka中有Span类型的数据,格式是Bytes[],需要通过SpanBytesDecoder这么一个类转…
flume+kafka+spark Streaming监听处理文件,但需要streaming能对完整的单个文件进行处理。
用flume+kafka+spark streaming框架分析数据,数据文件是导入的一个一个的,也就是完整的导入的,因为这些文件中每个文件的数据都是应用场景中一次事…
SparkStream checkpoint 的几个问题
刚用spark streaming,有几个关于checkpoint的疑问: checkpoint有两种,一种是对driver的meta的,一种是对data的。手册上说,只有用stateful transf…
spark如何计算前后两条数据(kafka数据源)的差值?
数据源是kafka,有一个字段是时间戳,我们想要计算前后两条数据的时间戳的差值,然后新增一个字段存储这个值再发出去,要怎么做呢?我查了一下好像要r…
spark streaming 运行 8 个小时左右挂掉,请问是问什么呢
报错日志 com.slhan.service.BusinessService 的 341 行是获取广播变量的值 18/09/08 13:50:02 ERROR scheduler.JobScheduler: Error running job st…
spark中两份很大的数据如何能避免join时的shuffle
目的:在spark中有两份很大的数据需要join,两份输入数据都含有userid这个字段,现在需要根据userid关联,我希望能避免shuffle。 已完成:我预先对两…
spark sql 解析嵌套对象数组的json
1.现在有json数据如下 {"id":11,"data":[{"package":"com.browser1","activetime":60000},{"package":"com.browser6","activetime":1205000},{"packa…
createDirectStream.foreachRDD调用外部对象出错
如下图: def update_model(rdd),调用外部声明的mixture_model会出现如下错误: 直接在update_model中声明 ·mixture_model·是没有问题的,但是每次fo…
spark-stream中如何依次遍历同一个window中每个batch的数据呢?
批次间隔为10s, 窗口大小为20s, 步长为10s, 这样每个window应该有2个批次的数据,但是我用DStream.foreachRDD()每次只执行一次,按我理解因为有2个批…
使用kafka向elasticsearch导入数据,如何获取导入数据的进度以及错误的日志?
我们目前采用的数据导入的框架是这样的: 但是consumer是我们用kafka-python写的python代码,当程序崩溃的时候,有保护机制重新启动,但是有时会重复…