状态函数内的迭代器为空

发布于 2025-01-12 04:22:20 字数 2786 浏览 0 评论 0原文

我有这样的案例课。我正在使用 RateStreamSource 生成测试数据。它给了我一个数据集。现在,我对数据集 groupByKey 进行分组并调用 mapGroupsWithState

然而,在状态函数 updateRateAnother 内部有一些逻辑,我正在打印迭代器。迭代器在方法中始终为空,我的逻辑不起作用。

以下是代码的最小可重现示例

case class Employee(id: String, value: Long)
case class Rate(timestamp: Timestamp, value: Long)
case class Rate2(timestamp: Timestamp, value: Long, age: Int)

object ResourceConfigConsolidator {

   def main(args: Array[String]): Unit = {

       val sparkSession = SparkSession
                         .builder()
                         .appName("TestJob")
                         .getOrCreate()

      import sparkSession.implicits._
      val rate = 2
      val randoms = List(10, 20, 30, 40, 50, 60, 70)
      def randomElement = Random.shuffle(randoms).head
      val rcConfigDS = sparkSession
                        .readStream
                        .format("rate") // <-- use RateStreamSource
                        .option("rowsPerSecond", rate)
                        .load()
                        .as[Rate].filter(_.value % 40 == 0).map {
                          r => Rate2(r.timestamp, r.value, randomElement)
                        }


 def updateRateAnother(key: Int, values: Iterator[Rate2], state: 
                              GroupState[Employee]): Option[Employee] = {
     println("key is here ::" + key)
     if (state.hasTimedOut) {
    // We've timed out, lets extract the state and send it down the stream
    state.remove()
    state.getOption
  } else {
    println("the iterating values ::::" + values.toList.mkString(" , \n"))
    println("hello length ::::" + values.length)
    if (!state.exists) {
      if (values.length == 0) {
        None
      } else {
        val latestValue = values.toList.maxBy(_.value)
        val employee = Employee(latestValue.value.toString, latestValue.value)
        state.update(employee)
        Some(employee)
      }
    } else {
      if (values.isEmpty) {
        val currentState = state.get
        Some(currentState)
      } else {
        val latestValue = values.toList.maxBy(_.value)
        val currentState = state.get
        val updated = currentState.copy(latestValue.value.toString, latestValue.value)
        state.update(currentState.copy(latestValue.value.toString, latestValue.value))
        Some(updated)
       }
     }
    }
   }

  val res: Dataset[Employee] = rcConfigDS.groupByKey(_.age).
  mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateRateAnother).flatMap(emp => 
  emp)

  res.writeStream.format("console")
  .outputMode(OutputMode.Update())
  .option("truncate", value = false)
  .option("checkpointLocation", "checkpoint1")
  .start()

 }
}

由于我与 age 进行分组,因此迭代器中应该至少有一个对象。我这样说对吗?为什么迭代器为空?

I have case classes like these. And I am generating test data using RateStreamSource. It is giving me a Dataset. Now I am grouping the dataset groupByKey and call mapGroupsWithState.

However inside the state function updateRateAnother there is some logic and I am printing the iterator. The Iterator always comes as Empty in the method and my logic does not work.

Following is the minimum reproducible example of the code

case class Employee(id: String, value: Long)
case class Rate(timestamp: Timestamp, value: Long)
case class Rate2(timestamp: Timestamp, value: Long, age: Int)

object ResourceConfigConsolidator {

   def main(args: Array[String]): Unit = {

       val sparkSession = SparkSession
                         .builder()
                         .appName("TestJob")
                         .getOrCreate()

      import sparkSession.implicits._
      val rate = 2
      val randoms = List(10, 20, 30, 40, 50, 60, 70)
      def randomElement = Random.shuffle(randoms).head
      val rcConfigDS = sparkSession
                        .readStream
                        .format("rate") // <-- use RateStreamSource
                        .option("rowsPerSecond", rate)
                        .load()
                        .as[Rate].filter(_.value % 40 == 0).map {
                          r => Rate2(r.timestamp, r.value, randomElement)
                        }


 def updateRateAnother(key: Int, values: Iterator[Rate2], state: 
                              GroupState[Employee]): Option[Employee] = {
     println("key is here ::" + key)
     if (state.hasTimedOut) {
    // We've timed out, lets extract the state and send it down the stream
    state.remove()
    state.getOption
  } else {
    println("the iterating values ::::" + values.toList.mkString(" , \n"))
    println("hello length ::::" + values.length)
    if (!state.exists) {
      if (values.length == 0) {
        None
      } else {
        val latestValue = values.toList.maxBy(_.value)
        val employee = Employee(latestValue.value.toString, latestValue.value)
        state.update(employee)
        Some(employee)
      }
    } else {
      if (values.isEmpty) {
        val currentState = state.get
        Some(currentState)
      } else {
        val latestValue = values.toList.maxBy(_.value)
        val currentState = state.get
        val updated = currentState.copy(latestValue.value.toString, latestValue.value)
        state.update(currentState.copy(latestValue.value.toString, latestValue.value))
        Some(updated)
       }
     }
    }
   }

  val res: Dataset[Employee] = rcConfigDS.groupByKey(_.age).
  mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateRateAnother).flatMap(emp => 
  emp)

  res.writeStream.format("console")
  .outputMode(OutputMode.Update())
  .option("truncate", value = false)
  .option("checkpointLocation", "checkpoint1")
  .start()

 }
}

Since I am grouping with the age, there should be at least one object in the iterator . Am I right in saying this ? Why is the iterator coming as empty ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

口干舌燥 2025-01-19 04:22:20

您确定打印时它是空的吗??因为这是唯一令人惊讶的事情。您只能遍历迭代器一次,因此一旦您第一次执行 values.toList ,它就会变空。您应该将 toList 的结果分配给一个变量,并丢弃迭代器。

更好的是,改变你的逻辑,这样你只需要一次传递,并摆脱 toList (你可以直接在 Iterator 上调用 maxBy .. 。但只有一次)。这个想法是在处理大型数据集时不要将所有数据一次加载到内存中。

Are you sure it is empty when you print it? Because that's the only thing that is surprising. You can only go through an iterator once, so once you do values.toList for the first time, it becomes empty. You should assign result of toList to a variable, and discard the iterator.

Better yet, change your logic so that you only need one pass, and get rid of toList (you can call maxBy on Iterator directly ... but only once). The idea is to not load all data at once into memory when dealing with a large dataset.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文