如何在 Spark 中从 RateStreamSource 生成 DStream

发布于 2025-01-10 02:11:42 字数 1203 浏览 0 评论 0原文

我在 Scala 中有一个像这样的案例类

case class RemoteCopyGroup(
  ts: Long,
  systemId: String,
  name: String,
  id: Int,
  role: String,
  mode: String,
   remoteGroupName: String)


object RemoteCopyGroup {

    // to be removed

   val arrayOfIds = Array("CZ210507H1", "CZ20030W4H", "CZ29400JBJ")
   def randomSerialNumber = Random.shuffle(arrayOfIds.toList).head

  def get(x: Rate): RemoteCopyGroup = {
    RemoteCopyGroup(
    x.timestamp.getTime / 1000,
    randomSerialNumber,
    Random.nextString(2),
    Random.nextInt(3),
    Random.nextString(2),
    Random.nextString(2),
    Random.nextString(2))
  }
}

我正在使用像这样的 RateStreamSource 生成数据流

val remoteCopyGroupDS: Dataset[(String, RemoteCopyGroup)] = sparkSession
  .readStream
  .format("rate") // <-- use RateStreamSource
  .option("rowsPerSecond", rate)
  .load()
  .as[Rate].filter(_.value % 10 == 0)
  .map(RemoteCopyGroup.get).map(rcg => rcg.systemId -> rcg)

我想在 remoteCopyGroupDS 上执行有状态操作,但我无法使用 mapWithState 这样的方法code> 因为 remoteCopyGroupDS 不是 DStream。 有没有办法可以生成连续发出数据的 DStream 或者可以将当前的 DataSet(即 remoteCopyGroupDS)转换为 DStream ?

I have a case class in Scala like this

case class RemoteCopyGroup(
  ts: Long,
  systemId: String,
  name: String,
  id: Int,
  role: String,
  mode: String,
   remoteGroupName: String)


object RemoteCopyGroup {

    // to be removed

   val arrayOfIds = Array("CZ210507H1", "CZ20030W4H", "CZ29400JBJ")
   def randomSerialNumber = Random.shuffle(arrayOfIds.toList).head

  def get(x: Rate): RemoteCopyGroup = {
    RemoteCopyGroup(
    x.timestamp.getTime / 1000,
    randomSerialNumber,
    Random.nextString(2),
    Random.nextInt(3),
    Random.nextString(2),
    Random.nextString(2),
    Random.nextString(2))
  }
}

I am generating a stream of data using RateStreamSource like this

val remoteCopyGroupDS: Dataset[(String, RemoteCopyGroup)] = sparkSession
  .readStream
  .format("rate") // <-- use RateStreamSource
  .option("rowsPerSecond", rate)
  .load()
  .as[Rate].filter(_.value % 10 == 0)
  .map(RemoteCopyGroup.get).map(rcg => rcg.systemId -> rcg)

I want to do stateful operations on remoteCopyGroupDS but I am not able to use methods like mapWithState because remoteCopyGroupDS is not a DStream.
Is there a way I can generate a DStream that continuously emits data or I can convert current DataSet i.e. remoteCopyGroupDS to DStream ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

情仇皆在手 2025-01-17 02:11:42

KafkaRate 是由 Kafka 主题或 Kafka 主题流连续发布的费率/价格数据流。在 Spark 中,DStream 是可以动态计算的分布式数据流。 DStream 表示数据的时间序列,用于计算其他 Spark 操作中使用的值。 DStream 可以从多种来源创建,包括 Kafka、Flume、HDFS 等。 Spark 提供了一个 Receiver 对象来与 Kafka 源交互。您可以使用 SparkContext 中提供的已创建接收器方法从 Kafka Sources 创建接收器对象。

The KafkaRate is a stream of rate/price data that is continuously published by a Kafka Topic or a Kafka Topic Stream. In Spark, the DStream is a distributed stream of data that can be computed on the fly. DStreams represent a time series of data and are used to compute values for use in other Spark operations. DStreams can be created from a variety of sources including Kafka, Flume, HDFS, and many others. Spark provides a Receiver object to interface with Kafka sources. You can create Receiver objects from Kafka Sources using the created receiver method that is provided in the SparkContext.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文