如何与actor实现异步相互依赖的可取消操作?
我对 Actor 模型还很陌生,这就是为什么我认为已经有既定的模式可以用 Actor 和 future 等美丽的可组合抽象来解决我的常见场景。
我的异步操作具有以下要求:
- 它们通过发送低级请求然后通过轮询监视实体的状态来使用遗留系统。因此,实际操作的结果只能以延迟的方式获得,当观察到的状态达到所需状态时,必须通知请求者。
- 这些操作只有在其他一些操作完成后才能发出,并且它们应该并行等待。
- 可以取消操作。当然,已经发出的低级请求无法撤消;取消意味着在我们依赖的操作完成后不发出实际操作,当然这必须递归传播(如果我们等待依赖项,并且它有多个挂起的操作,则不要发出它们)。
我在Futures中思考:第一个要求可以用Akka的map
/flatMap
来解决,第二个要求可以用traverse
组合器来解决,而无需维护依赖关系/程序上的依赖者。但我想不出取消的解决方案; futures 不能被取消,如果组合,它们的组件也无法访问。如何以函数式方式封装“取消当前操作”? Scala 的 Actor 框架是否支持此功能?
I am quite new to the Actor model, that's why I think there are already established patterns addressing my common-looking scenario with such beautiful composable abstractions as actors and futures.
I have asynchronous operations with the following requirements:
- They use a legacy system by sending out a low-level request and then monitoring the state of the entity with polling. So the result of an actual operation is only available in a deferred way, requesters have to be notified when the observed state reaches the desired state.
- These operations can be issued only after some other operations are finished, for which they should wait in parallel.
- The operations can be cancelled. Of course, already issued low-level requests cannot be undone; cancellation means don't issue the actual operation after the operations which we depend on finished, and of course this has to be propagated recursively (if we wait for a dependency, and it has multiple pending operations, don't issue them).
I'm thinking in Futures: the first requirement can be solved with e.g. Akka's map
/flatMap
, the second with the traverse
combinator without maintaining dependencies/dependents procedurally. But I can't think of a solution for cancellation; futures cannot be cancelled, and if composed, their components are not reachable. How to encapsulate "cancel the current operation" in a functional way? Does any of the Actor frameworks for Scala support this?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
使用监听器: https://github.com/jboner/akka/blob/release-1.2/akka-actor/src/main/scala/akka/routing/Listeners.scala
制作一个使用的 Actor侦听器将轮询状态传播给所有侦听器。然后您可以使用消息传递循环来重新启动轮询。
然后,您可以使用 stop 或发送 PoisonPill 来停止 actor。
这有帮助吗?
Use Listeners: https://github.com/jboner/akka/blob/release-1.2/akka-actor/src/main/scala/akka/routing/Listeners.scala
Make an Actor that uses Listeners to propagate the state of the polling to any and all listeners. Then you can use message-passing looping to reinitiate the polling.
You can then stop the actor using either stop or sending the PoisonPill.
Does that help?
Guava 的 ListenableFuture支持取消到 绑定在一起(但当 从集合中组合)。
Guava's ListenableFutures support cancellation to a level when bound together (but not when combined from a collection).