7.10 询问模式:Actor 中的 Future
由于Actor之间都是通过异步消息通信的。当你发送一条消息给一个Actor后,你通常只能等待Actor的返回。与同步方法不同,在你发送异步消息后,接受消息的Actor可能还根本来不及处理你的消息,而调用方就已经返回了。
这种模式与我们之前提到的Future模式非常相像。不同之处只是在传统的异步调用中,我们进行的是函数调用,但在这里,我们发送了一条消息。
因为两者的行为方式是如此相像,因此我们就会很自然地想到,当我们需要一个有返回值的调用时,Actor是不是也应该给我们一个契约(Future)呢?这样,就算我们当下没有办法立即获得Actor的处理结果,在将来,通过这个契约还是可以追踪到我们的请求的。
01 import static akka.pattern.Patterns.ask; 02 import static akka.pattern.Patterns.pipe; 03 04 public class AskMain { 05 06 public static void main(String[] args) throws Exception { 07 ActorSystem system = ActorSystem.create("askdemo", ConfigFactory.load("samplehello.conf")); 08 ActorRef worker = system.actorOf(Props.create(MyWorker.class), "worker"); 09 ActorRef printer = system.actorOf(Props.create(Printer.class), "printer"); 10 system.actorOf(Props.create(WatchActor.class, worker), "watcher"); 11 12 //等待future返回 13 Future<Object> f = ask(worker, 5, 1500); 14 int re = (int) Await.result(f, Duration.create(6, TimeUnit.SECONDS)); 15 System.out.println("return:" + re); 16 17 //直接导向其他Actor,pipe不会等待 18 f = ask(worker, 6, 1500); 19 pipe(f, system.dispatcher()).to(printer); 20 21 worker.tell(PoisonPill.getInstance(), ActorRef.noSender()); 22 } 23 }
上述代码给出了两处在Actor交互中使用Future的例子。
在第13行,使用ask()方法给worker发送消息,消息内容是5,也就说worker会接收到一个Integer消息,值为5。当workder接收到消息后,就可以进行计算处理,并且将结果返回给发送者。当然,这个处理过程可能需要花费一点时间。
方法ask()不会等待worker处理,会立即返回一个Future对象(第13行)。在第14行,我们使用Await方法等待worker的返回,接着在第15行打印返回结果。
在这种方法中,我们间接地将一个异步调用转为同步阻塞调用。虽然比较容易理解,但是在有些场合可能会出现性能问题。另外一种更为有效的方法是使用pipe()函数。
代码第18行使用ask()再次询问worker,并传递数值6给worker。接着并不进行等待,而是使用pipe()将这个Future重定向到另外一个称为printer的Actor。pipe()函数不会阻塞程序,会立即返回。
这个printer的实现很简单的,只是简单地输出得到的数据:
01 @Override 02 public void onReceive(Object msg) { 03 if (msg instanceof Integer) { 04 System.out.println("Printer:"+msg); 05 } 06 if (msg == Msg.DONE) { 07 log.info("Stop working"); 08 }if (msg == Msg.CLOSE) { 09 log.info("I will shutdown"); 10 getSender().tell(Msg.CLOSE, getSelf()); 11 getContext().stop(getSelf()); 12 } else 13 unhandled(msg); 14 }
上述代码就是Printer Actor的实现,它会通过pipe()方法得到worker的输出结果,并打印在控制台上(第4行)。
在本例中,worker Actor接受一个整数,并计算它的平方,并给予返回。如下:
01 @Override 02 public void onReceive(Object msg) { 03 if (msg instanceof Integer) { 04 int i=(Integer)msg; 05 try { 06 Thread.sleep(1000); 07 } catch (InterruptedException e) {} 08 getSender().tell(i*i, getSelf()); 09 } 10 if (msg == Msg.DONE) { 11 log.info("Stop working"); 12 }if (msg == Msg.CLOSE) { 13 log.info("I will shutdown"); 14 getSender().tell(Msg.CLOSE, getSelf()); 15 getContext().stop(getSelf()); 16 } else 17 unhandled(msg); 18 }
上述代码第5~7行,模拟了一个耗时的调用,为了更明显地说明ask()和pipe()方法的用途。第8行,worker计算了给定数值的平方,并把它“告诉”请求者。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论