Flink KeyedCoProcessFunction 处理状态

发布于 2025-01-17 09:10:03 字数 2518 浏览 2 评论 0 原文

我使用 keyedCoprocessfunction 函数用数据丰富了主数据流,来自另一个流

代码:

class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {

  case class AssetStateDoc(assetId: Option[String])
  private var associatedDevices: ValueState[AssetStateDoc] = _

  override def open(parameters: Configuration): Unit = {
    val associatedDevicesDescriptor =
      new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
    associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
  }

  override def processElement1(
                                packet: PacketData,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    val tmpState = associatedDevices.value
    val state = if (tmpState == null) AssetStateDoc(None) else tmpState
    
    state.assetId match {
      case Some(assetId) =>
        logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
        out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
      case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
      case _ => logger.debug("Smth went wrong")
    }
  }

  override def processElement2(
                                value: AssetCommandState,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    value.command match {
      case CREATE =>
        logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
        logger.debug(s"current state is ${associatedDevices.value()}")
        associatedDevices.update(AssetStateDoc(Some(value.assetId)))
        logger.debug(s"new state is ${associatedDevices.value()}")
      case _ =>
        logger.error("Got unknown AssetCommandState command")
    }
  }
}

processElement2()效果很好,它是接受数据并更新状态。


但是在 processElement1()中,我总是击中 case none => 对于数据包$ {packet.tag.externalId}”)

logger.debug(s“无状态,

>我使用了本指南 - https://nightlies.apache.org/flink/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/

I use KeyedCoProcessFunction function to enrich main datastream with data comes from another stream

Code:

class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {

  case class AssetStateDoc(assetId: Option[String])
  private var associatedDevices: ValueState[AssetStateDoc] = _

  override def open(parameters: Configuration): Unit = {
    val associatedDevicesDescriptor =
      new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
    associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
  }

  override def processElement1(
                                packet: PacketData,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    val tmpState = associatedDevices.value
    val state = if (tmpState == null) AssetStateDoc(None) else tmpState
    
    state.assetId match {
      case Some(assetId) =>
        logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
        out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
      case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
      case _ => logger.debug("Smth went wrong")
    }
  }

  override def processElement2(
                                value: AssetCommandState,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    value.command match {
      case CREATE =>
        logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
        logger.debug(s"current state is ${associatedDevices.value()}")
        associatedDevices.update(AssetStateDoc(Some(value.assetId)))
        logger.debug(s"new state is ${associatedDevices.value()}")
      case _ =>
        logger.error("Got unknown AssetCommandState command")
    }
  }
}

processElement2() works good, it's accept data and update a state.
but in a processElement1() I am always hitting case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")

although I expect that there will be a value that was set in processElement2 function

as an example I used this guide - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

等往事风中吹 2025-01-24 09:10:03

processElement1 processElement2 确实共享状态,但请记住,这是关键分区状态。这意味着 ProcessElement2 中设置的值时,处理给定值 v2 时,仅在 processElement1 中看到它以稍后使用值< em> v1 具有与 v2 相同的密钥。

还要记住,您无法控制 ProcessElement1 ProcessElement2 之间的两个流之间的比赛条件。

ridesandfares练习来自官方的apache flink训练。关于学习与API的这一部分合作。 是相应教程的家。

processElement1 and processElement2 do share state, but keep in mind that this is key-partitioned state. This means that a value set in processElement2 when processing a given value v2 will only be seen in processElement1 when it is called later with a value v1 having the same key as v2.

Also keep in mind that you have no control over the race condition between the two streams coming into processElement1 and processElement2.

The RidesAndFares exercise from the official Apache Flink training is all about learning to work with this part of the API. https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/ is the home for the corresponding tutorial.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文