Flink KeyedCoProcessFunction 处理状态
我使用 keyedCoprocessfunction
函数用数据丰富了主数据流,来自另一个流
代码:
class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {
case class AssetStateDoc(assetId: Option[String])
private var associatedDevices: ValueState[AssetStateDoc] = _
override def open(parameters: Configuration): Unit = {
val associatedDevicesDescriptor =
new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
}
override def processElement1(
packet: PacketData,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
val tmpState = associatedDevices.value
val state = if (tmpState == null) AssetStateDoc(None) else tmpState
state.assetId match {
case Some(assetId) =>
logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
case _ => logger.debug("Smth went wrong")
}
}
override def processElement2(
value: AssetCommandState,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
value.command match {
case CREATE =>
logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
logger.debug(s"current state is ${associatedDevices.value()}")
associatedDevices.update(AssetStateDoc(Some(value.assetId)))
logger.debug(s"new state is ${associatedDevices.value()}")
case _ =>
logger.error("Got unknown AssetCommandState command")
}
}
}
processElement2()
效果很好,它是接受数据并更新状态。
但是在 processElement1()
中,我总是击中 case none => 对于数据包$ {packet.tag.externalId}”)
logger.debug(s“无状态,
>我使用了本指南 - https://nightlies.apache.org/flink/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
processElement1
和processElement2
确实共享状态,但请记住,这是关键分区状态。这意味着ProcessElement2
中设置的值时,处理给定值 v2 时,仅在processElement1
中看到它以稍后使用值< em> v1 具有与 v2 相同的密钥。还要记住,您无法控制
ProcessElement1
和ProcessElement2
之间的两个流之间的比赛条件。ridesandfares练习来自官方的apache flink训练。关于学习与API的这一部分合作。 是相应教程的家。
processElement1
andprocessElement2
do share state, but keep in mind that this is key-partitioned state. This means that a value set inprocessElement2
when processing a given value v2 will only be seen inprocessElement1
when it is called later with a value v1 having the same key as v2.Also keep in mind that you have no control over the race condition between the two streams coming into
processElement1
andprocessElement2
.The RidesAndFares exercise from the official Apache Flink training is all about learning to work with this part of the API. https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/ is the home for the corresponding tutorial.