7.8 消息路由
Akka提供了非常灵活的消息发送机制。有时候,我们也许会使用一组Actor而不是一个Actor来提供一项服务。这一组Actor中所有的Actor都是对等的,也就是说你可以找任何一个Actor来为你服务。这种情况下,如何才能快速有效地找到合适的Actor呢?或者说如何调度这些消息,才可以使负载更为均衡地分配在这一组Actor中。
为了解决这个问题,Akka使用一个路由器组件(Router)来封装消息的调度。系统提供了几种实用的消息路由策略,比如,轮询选择Actor进行消息发送,随机消息发送,将消息发送给最为空闲的Actor,甚至是在组内广播消息。
下面就来演示一下消息路由的使用方式:
01 public class WatchActor extends UntypedActor { 02 private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); 03 public Router router; 04 { 05 List<Routee> routees=new ArrayList<Routee>(); 06 for(int i=0;i<5;i++){ 07 ActorRef worker = getContext().actorOf(Props.create(MyWorker.class),"worker_"+i); 08 getContext().watch(worker); 09 routees.add(new ActorRefRoutee(worker)); 10 } 11 router=new Router(new RoundRobinRoutingLogic(),routees); 12 } 13 14 @Override 15 public void onReceive(Object msg) { 16 if(msg instanceof MyWorker.Msg){ 17 router.route(msg, getSender()); 18 }else if (msg instanceof Terminated) { 19 router=router.removeRoutee(((Terminated)msg).actor()); 20 System.out.println(((Terminated)msg).actor().path()+" is closed,routees="+router. routees().size()); 21 if(router.routees().size()==0){ 22 System.out.println("Close system"); 23 RouteMain.flag.send(false); 24 getContext().system().shutdown(); 25 } 26 } else { 27 unhandled(msg); 28 } 29 } 30 }
上述代码中定义了WatchActor。第3行,就是路由器组件Router,在构造Router时,需要指定路由策略和一组被路由的Actor(Routee),如第11行所示。这里使用了RoundRobinRoutingLogic路由策略,也就是对所有的Routee进行轮询消息发送。在本例中,Routee由5个MyWorker Actor构成(第6~10行,MyWorker与上一节中的相同,故不再给出代码)。
当有消息需要传递给这5个MyWorker时,只需要将消息投递给这个Router即可(上述代码第17行)。Router就会根据给定的消息路由策略进行消息投递。当一个MyWorker停止工作时,还可以简单地将其从工作组中移除(第19行)。在这里,如果发现系统中没有可用的Actor,就会直接关闭系统。
主函数比较简单,如下:
01 public class RouteMain { 02 public static Agent<Boolean> flag=Agent.create(true, ExecutionContexts.global()); 03 public static void main(String[] args) throws InterruptedException { 04 ActorSystem system = ActorSystem.create("route", ConfigFactory.load("samplehello.conf")); 05 ActorRef w=system.actorOf(Props.create(WatchActor.class), "watcher"); 06 int i=1; 07 while(flag.get()){ 08 w.tell(MyWorker.Msg.WORKING, ActorRef.noSender()); 09 if(i%10==0)w.tell(MyWorker.Msg.CLOSE, ActorRef.noSender()); 10 i++; 11 Thread.sleep(100); 12 } 13 } 14 }
上述代码向WatchActor发送大量消息,其中夹杂着几条关闭Actor的消息。这会使得MyWorker Actor逐一被关闭,最终程序将退出。
这段程序的部分输出如下(做过适量裁剪):
[INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_0] I am working [INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_1] I am working [INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_2] I am working [INFO][route-akka.actor.default-dispatcher-4] [akka://route/user/watcher/worker_3] I am working [INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_4] I am working [INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_0] I am working ... [INFO][route-akka.actor.default-dispatcher-2] [akka://route/user/watcher/worker_0] I will shutdown akka://route/user/watcher/worker_1 is closed,routees=0 Close system
可以看到,WORKING消息被轮流发送给这5个worker。大家可以修改路由策略,观察不同路由策略下的消息投递方式(除了RoundRobinRoutingLogic外,还可以尝试BroadcastRoutingLogic广播策略、RandomRoutingLogic随机投递策略、SmallestMailboxRoutingLogic空闲Actor优先投递策略)。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论