spark 怎么对一个内容为一批Iterable[Double]的Rdd中的每一个Iterable[Double]做增值修改
小弟刚学spark
,遇到个问题,求指教:
val rdd = ... // 内容是 Iterable[Double]
现在想对rdd
的每一个Iterable[Double]
中按某种方法插入某些元素,怎么做?
我写了个函数做插值,在master
为local
的时候没有问题,master
使用多worker
就错了,怎么写并发安全的spark
程序啊?
栗子:
L1: [(103, v3),(106, v6)]
L2: [(101, v1),(102, v2),(103, v3),(104, v4),(105, v5),(106, v6)]
就是从L1
变到L2
, 其中v3
,v6
是已知的值,v1
,v2
,v4
,v5
使用默认的0
更新:
case class Metric(name: String, value: Double, timestamp: Long)
implicit val metricReads = Json.format[Metric]
......
val input = sc.textFile("/data/xxxxxxx.txt") // 每行一个json结构
val parsed = input.map(Json.parse(_)) // Parse json
val result = parsed.flatMap(record => metricReads.reads(record).asOpt) // 得到Metric的rdd
val grouped = result.groupBy(_.name) // 根据Metric的name字段做groupBy,得到RDD[(name, Iterable[Metric])]
val output = grouped.mapValues(XXX)
因为每个Iterable[Metric]
的个数都不相等,所以需要在mapValues
中对每个Iterable[Metric]
做插值,都插成360个点的集合,插值逻辑就是按照timestamp
每隔10s插一个点。
xxxxxxx.txt
{"name": "foo", "value": 1.1, "timestamp": 1469682268}
{"name": "bar", "value": 1.2, "timestamp": 1469682276}
{"name": "foo", "value": 1.3, "timestamp": 1469682417}
{"name": "bar", "value": 1.4, "timestamp": 1469683205}
{"name": "foo", "value": 1.5, "timestamp": 1469683369}
groupBy
之后:
[
("foo", [{"timestamp": 1469682268, "value": 1.1}, {"timestamp": 1469682417, "value": 1.3}, {"timestamp": 1469683369, "value": 1.5}]),
("bar", [{"timestamp": 1469682276, "value": 1.4}, {"timestamp": 1469683205, "value": 1.5}])
]
最终就是要让foo
和bar
的点数相同,每隔10s插一个点,共360个,上面的时间都是2016-07-28 13:00:00
到2016-07-28 14:00:00
之间的,这段时间内的每个0~10s
间隔之内有值就不插,没值的话就用0
。比如:1469682000~1469682010
之间没有值,就用0
补,1469682260~1469682270
之间有值1.1
,就用1.1
。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这样做是没法并发的,如果你要扩充rdd,可以把需要插入的数据也做成一个rdd,然后用spark提供的join、union、intersection等操作来处理两个rdd
参考程序,没有加json依赖,所以自己parse的