返回介绍

7.10 询问模式:Actor 中的 Future

发布于 2024-08-21 22:20:21 字数 2858 浏览 0 评论 0 收藏 0

由于Actor之间都是通过异步消息通信的。当你发送一条消息给一个Actor后,你通常只能等待Actor的返回。与同步方法不同,在你发送异步消息后,接受消息的Actor可能还根本来不及处理你的消息,而调用方就已经返回了。

这种模式与我们之前提到的Future模式非常相像。不同之处只是在传统的异步调用中,我们进行的是函数调用,但在这里,我们发送了一条消息。

因为两者的行为方式是如此相像,因此我们就会很自然地想到,当我们需要一个有返回值的调用时,Actor是不是也应该给我们一个契约(Future)呢?这样,就算我们当下没有办法立即获得Actor的处理结果,在将来,通过这个契约还是可以追踪到我们的请求的。

01 import static akka.pattern.Patterns.ask;
02 import static akka.pattern.Patterns.pipe;
03
04 public class AskMain {
05
06   public static void main(String[] args) throws Exception {
07     ActorSystem system = ActorSystem.create("askdemo", ConfigFactory.load("samplehello.conf"));
08     ActorRef worker = system.actorOf(Props.create(MyWorker.class), "worker");
09     ActorRef printer = system.actorOf(Props.create(Printer.class), "printer");
10     system.actorOf(Props.create(WatchActor.class, worker), "watcher");
11
12     //等待future返回
13     Future<Object> f = ask(worker, 5, 1500);
14     int re = (int) Await.result(f, Duration.create(6, TimeUnit.SECONDS));
15     System.out.println("return:" + re);
16
17     //直接导向其他Actor,pipe不会等待
18     f = ask(worker, 6, 1500);
19     pipe(f, system.dispatcher()).to(printer);
20
21     worker.tell(PoisonPill.getInstance(), ActorRef.noSender());
22   }
23 }

上述代码给出了两处在Actor交互中使用Future的例子。

在第13行,使用ask()方法给worker发送消息,消息内容是5,也就说worker会接收到一个Integer消息,值为5。当workder接收到消息后,就可以进行计算处理,并且将结果返回给发送者。当然,这个处理过程可能需要花费一点时间。

方法ask()不会等待worker处理,会立即返回一个Future对象(第13行)。在第14行,我们使用Await方法等待worker的返回,接着在第15行打印返回结果。

在这种方法中,我们间接地将一个异步调用转为同步阻塞调用。虽然比较容易理解,但是在有些场合可能会出现性能问题。另外一种更为有效的方法是使用pipe()函数。

代码第18行使用ask()再次询问worker,并传递数值6给worker。接着并不进行等待,而是使用pipe()将这个Future重定向到另外一个称为printer的Actor。pipe()函数不会阻塞程序,会立即返回。

这个printer的实现很简单的,只是简单地输出得到的数据:

01 @Override
02 public void onReceive(Object msg) {
03   if (msg instanceof Integer) {
04     System.out.println("Printer:"+msg);
05   }
06   if (msg == Msg.DONE) {
07     log.info("Stop working");
08   }if (msg == Msg.CLOSE) {
09     log.info("I will shutdown");
10     getSender().tell(Msg.CLOSE, getSelf());
11     getContext().stop(getSelf());
12   } else
13     unhandled(msg);
14 }

上述代码就是Printer Actor的实现,它会通过pipe()方法得到worker的输出结果,并打印在控制台上(第4行)。

在本例中,worker Actor接受一个整数,并计算它的平方,并给予返回。如下:

01 @Override
02 public void onReceive(Object msg) {
03   if (msg instanceof Integer) {
04     int i=(Integer)msg;
05     try {
06       Thread.sleep(1000);
07     } catch (InterruptedException e) {}
08     getSender().tell(i*i, getSelf());
09   }
10   if (msg == Msg.DONE) {
11     log.info("Stop working");
12   }if (msg == Msg.CLOSE) {
13     log.info("I will shutdown");
14     getSender().tell(Msg.CLOSE, getSelf());
15     getContext().stop(getSelf());
16   } else
17     unhandled(msg);
18 }

上述代码第5~7行,模拟了一个耗时的调用,为了更明显地说明ask()和pipe()方法的用途。第8行,worker计算了给定数值的平方,并把它“告诉”请求者。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文