返回介绍

7.8 消息路由

发布于 2024-08-21 22:20:21 字数 3459 浏览 0 评论 0 收藏 0

Akka提供了非常灵活的消息发送机制。有时候,我们也许会使用一组Actor而不是一个Actor来提供一项服务。这一组Actor中所有的Actor都是对等的,也就是说你可以找任何一个Actor来为你服务。这种情况下,如何才能快速有效地找到合适的Actor呢?或者说如何调度这些消息,才可以使负载更为均衡地分配在这一组Actor中。

为了解决这个问题,Akka使用一个路由器组件(Router)来封装消息的调度。系统提供了几种实用的消息路由策略,比如,轮询选择Actor进行消息发送,随机消息发送,将消息发送给最为空闲的Actor,甚至是在组内广播消息。

下面就来演示一下消息路由的使用方式:

01 public class WatchActor extends UntypedActor {
02   private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
03   public Router router;
04   {
05     List<Routee> routees=new ArrayList<Routee>();
06     for(int i=0;i<5;i++){
07      ActorRef worker = getContext().actorOf(Props.create(MyWorker.class),"worker_"+i);
08       getContext().watch(worker);
09       routees.add(new ActorRefRoutee(worker));
10     }
11     router=new Router(new RoundRobinRoutingLogic(),routees);
12   }
13
14   @Override
15   public void onReceive(Object msg) {
16     if(msg instanceof  MyWorker.Msg){
17       router.route(msg, getSender());
18     }else if (msg instanceof Terminated) {
19     router=router.removeRoutee(((Terminated)msg).actor());
20     System.out.println(((Terminated)msg).actor().path()+" is closed,routees="+router.
routees().size());
21     if(router.routees().size()==0){
22       System.out.println("Close system");
23       RouteMain.flag.send(false);
24       getContext().system().shutdown();
25     }
26     } else {
27     unhandled(msg);
28     }
29   }
30 }

上述代码中定义了WatchActor。第3行,就是路由器组件Router,在构造Router时,需要指定路由策略和一组被路由的Actor(Routee),如第11行所示。这里使用了RoundRobinRoutingLogic路由策略,也就是对所有的Routee进行轮询消息发送。在本例中,Routee由5个MyWorker Actor构成(第6~10行,MyWorker与上一节中的相同,故不再给出代码)。

当有消息需要传递给这5个MyWorker时,只需要将消息投递给这个Router即可(上述代码第17行)。Router就会根据给定的消息路由策略进行消息投递。当一个MyWorker停止工作时,还可以简单地将其从工作组中移除(第19行)。在这里,如果发现系统中没有可用的Actor,就会直接关闭系统。

主函数比较简单,如下:

01 public class RouteMain {
02   public static Agent<Boolean> flag=Agent.create(true, ExecutionContexts.global());
03   public static void main(String[] args) throws InterruptedException {
04   ActorSystem system = ActorSystem.create("route", ConfigFactory.load("samplehello.conf"));
05   ActorRef w=system.actorOf(Props.create(WatchActor.class), "watcher");
06   int i=1;
07   while(flag.get()){
08     w.tell(MyWorker.Msg.WORKING, ActorRef.noSender());
09     if(i%10==0)w.tell(MyWorker.Msg.CLOSE, ActorRef.noSender());
10     i++;
11     Thread.sleep(100);
12   }
13   }
14 }

上述代码向WatchActor发送大量消息,其中夹杂着几条关闭Actor的消息。这会使得MyWorker Actor逐一被关闭,最终程序将退出。

这段程序的部分输出如下(做过适量裁剪):

[INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_0] I am
working
[INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_1] I am
working
[INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_2] I am
working
[INFO][route-akka.actor.default-dispatcher-4] [akka://route/user/watcher/worker_3] I am
working
[INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_4] I am
working
[INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_0] I am
working
...
[INFO][route-akka.actor.default-dispatcher-2] [akka://route/user/watcher/worker_0] I will
shutdown
akka://route/user/watcher/worker_1 is closed,routees=0
Close system

可以看到,WORKING消息被轮流发送给这5个worker。大家可以修改路由策略,观察不同路由策略下的消息投递方式(除了RoundRobinRoutingLogic外,还可以尝试BroadcastRoutingLogic广播策略、RandomRoutingLogic随机投递策略、SmallestMailboxRoutingLogic空闲Actor优先投递策略)。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文