如何在 Spark 中从 RateStreamSource 生成 DStream
我在 Scala 中有一个像这样的案例类
case class RemoteCopyGroup(
ts: Long,
systemId: String,
name: String,
id: Int,
role: String,
mode: String,
remoteGroupName: String)
object RemoteCopyGroup {
// to be removed
val arrayOfIds = Array("CZ210507H1", "CZ20030W4H", "CZ29400JBJ")
def randomSerialNumber = Random.shuffle(arrayOfIds.toList).head
def get(x: Rate): RemoteCopyGroup = {
RemoteCopyGroup(
x.timestamp.getTime / 1000,
randomSerialNumber,
Random.nextString(2),
Random.nextInt(3),
Random.nextString(2),
Random.nextString(2),
Random.nextString(2))
}
}
我正在使用像这样的 RateStreamSource 生成数据流
val remoteCopyGroupDS: Dataset[(String, RemoteCopyGroup)] = sparkSession
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", rate)
.load()
.as[Rate].filter(_.value % 10 == 0)
.map(RemoteCopyGroup.get).map(rcg => rcg.systemId -> rcg)
我想在 remoteCopyGroupDS
上执行有状态操作,但我无法使用 mapWithState
这样的方法code> 因为 remoteCopyGroupDS
不是 DStream。 有没有办法可以生成连续发出数据的 DStream 或者可以将当前的 DataSet(即 remoteCopyGroupDS)转换为 DStream ?
I have a case class in Scala like this
case class RemoteCopyGroup(
ts: Long,
systemId: String,
name: String,
id: Int,
role: String,
mode: String,
remoteGroupName: String)
object RemoteCopyGroup {
// to be removed
val arrayOfIds = Array("CZ210507H1", "CZ20030W4H", "CZ29400JBJ")
def randomSerialNumber = Random.shuffle(arrayOfIds.toList).head
def get(x: Rate): RemoteCopyGroup = {
RemoteCopyGroup(
x.timestamp.getTime / 1000,
randomSerialNumber,
Random.nextString(2),
Random.nextInt(3),
Random.nextString(2),
Random.nextString(2),
Random.nextString(2))
}
}
I am generating a stream of data using RateStreamSource like this
val remoteCopyGroupDS: Dataset[(String, RemoteCopyGroup)] = sparkSession
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", rate)
.load()
.as[Rate].filter(_.value % 10 == 0)
.map(RemoteCopyGroup.get).map(rcg => rcg.systemId -> rcg)
I want to do stateful operations on remoteCopyGroupDS
but I am not able to use methods like mapWithState
because remoteCopyGroupDS
is not a DStream.
Is there a way I can generate a DStream that continuously emits data or I can convert current DataSet
i.e. remoteCopyGroupDS to DStream
?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
KafkaRate 是由 Kafka 主题或 Kafka 主题流连续发布的费率/价格数据流。在 Spark 中,DStream 是可以动态计算的分布式数据流。 DStream 表示数据的时间序列,用于计算其他 Spark 操作中使用的值。 DStream 可以从多种来源创建,包括 Kafka、Flume、HDFS 等。 Spark 提供了一个 Receiver 对象来与 Kafka 源交互。您可以使用 SparkContext 中提供的已创建接收器方法从 Kafka Sources 创建接收器对象。
The KafkaRate is a stream of rate/price data that is continuously published by a Kafka Topic or a Kafka Topic Stream. In Spark, the DStream is a distributed stream of data that can be computed on the fly. DStreams represent a time series of data and are used to compute values for use in other Spark operations. DStreams can be created from a variety of sources including Kafka, Flume, HDFS, and many others. Spark provides a Receiver object to interface with Kafka sources. You can create Receiver objects from Kafka Sources using the created receiver method that is provided in the SparkContext.