如何与actor实现异步相互依赖的可取消操作?

发布于 2024-12-02 00:51:19 字数 574 浏览 1 评论 0原文

我对 Actor 模型还很陌生,这就是为什么我认为已经有既定的模式可以用 Actor 和 future 等美丽的可组合抽象来解决我的常见场景。

我的异步操作具有以下要求:

  • 它们通过发送低级请求然后通过轮询监视实体的状态来使用遗留系统。因此,实际操作的结果只能以延迟的方式获得,当观察到的状态达到所需状态时,必须通知请求者。
  • 这些操作只有在其他一些操作完成后才能发出,并且它们应该并行等待。
  • 可以取消操作。当然,已经发出的低级请求无法撤消;取消意味着在我们依赖的操作完成后不发出实际操作,当然这必须递归传播(如果我们等待依赖项,并且它有多个挂起的操作,则不要发出它们)。

我在Futures中思考:第一个要求可以用Akka的map/flatMap来解决,第二个要求可以用traverse组合器来解决,而无需维护依赖关系/程序上的依赖者。但我想不出取消的解决方案; futures 不能被取消,如果组合,它们的组件也无法访问。如何以函数式方式封装“取消当前操作”? Scala 的 Actor 框架是否支持此功能?

I am quite new to the Actor model, that's why I think there are already established patterns addressing my common-looking scenario with such beautiful composable abstractions as actors and futures.

I have asynchronous operations with the following requirements:

  • They use a legacy system by sending out a low-level request and then monitoring the state of the entity with polling. So the result of an actual operation is only available in a deferred way, requesters have to be notified when the observed state reaches the desired state.
  • These operations can be issued only after some other operations are finished, for which they should wait in parallel.
  • The operations can be cancelled. Of course, already issued low-level requests cannot be undone; cancellation means don't issue the actual operation after the operations which we depend on finished, and of course this has to be propagated recursively (if we wait for a dependency, and it has multiple pending operations, don't issue them).

I'm thinking in Futures: the first requirement can be solved with e.g. Akka's map/flatMap, the second with the traverse combinator without maintaining dependencies/dependents procedurally. But I can't think of a solution for cancellation; futures cannot be cancelled, and if composed, their components are not reachable. How to encapsulate "cancel the current operation" in a functional way? Does any of the Actor frameworks for Scala support this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

挽清梦 2024-12-09 00:51:19

使用监听器: https://github.com/jboner/akka/blob/release-1.2/akka-actor/src/main/scala/akka/routing/Listeners.scala

制作一个使用的 Actor侦听器将轮询状态传播给所有侦听器。然后您可以使用消息传递循环来重新启动轮询。

class MyActor extends Actor with Listeners {

  override def preStart {
    self ! 'poll //Start looping on start
  }

  def receive = listenerManagement orElse {
    case 'poll => val result = pollYourExternalDude()
    gossip(result)
    self ! 'poll //Loop
  }
}

然后,您可以使用 stop 或发送 PoisonPill 来停止 actor。

这有帮助吗?

Use Listeners: https://github.com/jboner/akka/blob/release-1.2/akka-actor/src/main/scala/akka/routing/Listeners.scala

Make an Actor that uses Listeners to propagate the state of the polling to any and all listeners. Then you can use message-passing looping to reinitiate the polling.

class MyActor extends Actor with Listeners {

  override def preStart {
    self ! 'poll //Start looping on start
  }

  def receive = listenerManagement orElse {
    case 'poll => val result = pollYourExternalDude()
    gossip(result)
    self ! 'poll //Loop
  }
}

You can then stop the actor using either stop or sending the PoisonPill.

Does that help?

梅窗月明清似水 2024-12-09 00:51:19

Guava 的 ListenableFuture支持取消到 绑定在一起(但当 从集合中组合)。

Guava's ListenableFutures support cancellation to a level when bound together (but not when combined from a collection).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文