返回介绍

7.5 监督策略

发布于 2024-08-21 22:20:21 字数 6876 浏览 0 评论 0 收藏 0

如果一个Actor在执行过程中发生意外,比如没有处理某些异常,导致出错,那么这个时候应该怎么办呢?系统是应该当做什么都没发生过,继续执行,还是认为遇到了一个系统性的错误而重启Actor甚至是它所有的兄弟Actor呢?

对于这种情况,Akka框架给予了我们足够的控制权。在Akka框架内,父Actor可以对子Actor进行监督,监控Actor的行为是否有异常。大体上,监督策略可以分为两种:一种是OneForOneStrategy的监督,另外一种是AllForOneStrategy。

对于OneForOneStrategy的策略,父Actor只会对出问题的子Actor进行处理,比如重启或者停止,而对于AllForOneStrategy,父Actor会对出问题的子Actor以及它所有的兄弟都进行处理。很显然,对于AllForOneStrategy策略,它更加适合于各个Actor联系非常紧密的场景,如果多个Actor间只要有一个Actor出现故障,则宣告整个任务失败,就比较适合使用AllForOneStrategy,否则,在更多的场景中,应该使用OneForOneStrategy。当然了,OneForOneStrategy也是Akka的默认策略。

在一个指定的策略中,我们可以对Actor的失败情况进行相应的处理,比如:当失败时,我们可以无视这个错误,继续执行Actor,就像什么事都没发生过一样。或者可以重启这个Actor,甚至可以让这个Actor彻底停止工作。要指定这些监督行为,只要构造一个自定义的监督策略即可。

下面让我们简单看一下SupervisorStrategy的使用和设置。首先,需要定一个父Actor,它作为所有子Actor的监督者:

01 public class Supervisor extends UntypedActor {
02 private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create(1, TimeUnit.MINUTES),
03       new Function<Throwable, Directive>() {
04         @Override
05         public Directive apply(Throwable t) {
06           if (t instanceof ArithmeticException) {
07             System.out.println("meet ArithmeticException,just resume");
08             return SupervisorStrategy.resume();
09           } else if (t instanceof NullPointerException) {
10             System.out.println("meet NullPointerException,restart");
11             return SupervisorStrategy.restart();
12           } else if (t instanceof IllegalArgumentException) {
13             return SupervisorStrategy.stop();
14           } else {
15             return SupervisorStrategy.escalate();
16           }
17         }
18       });
19
20   @Override
21   public SupervisorStrategy supervisorStrategy() {
22     return strategy;
23   }
24
25   public void onReceive(Object o) {
26     if (o instanceof Props) {
27       getContext().actorOf((Props) o,"restartActor");
28     } else {
29       unhandled(o);
30     }
31   }
32 }

上述代码第2~18行,定义了一个OneForOneStrategy的监督策略。在这个监督策略中,运行Actor在遇到错误后,在1分钟内进行3次重试。如果超过这个频率,那么就会直接杀死Actor。具体的策略由第5~16行定义。这里的含义是,当遇到ArithmeticException异常时(比如除以0的错误),继续指定这个Actor,不做任何处理(第8行);当遇到空指针时,进行Actor的重启(第11行)。如果遇到IllegalArgumentException异常,则直接停止Actor(第13行)。对于在这个函数中没有涉及的异常,则向上抛出,由更顶层的Actor处理(第15行)。

第20~23行覆盖父类的supervisorStrategy()方法,设置使用自定义的监督策略。

第27行用来新建一个名为restartActor的子Actor,这个子Actor就由当前的Supervisor进行监督了。当Supervisor接收一个Props对象时,就会根据这个Props配置生成一个restartActor。

RestartActor的实现如下:

01 public class RestartActor extends UntypedActor { 02 public enum Msg { 03 DONE, RESTART 04 } 05 06 @Override 07 public void preStart() { 08 System.out.println("preStart hashcode:" + this.hashCode()); 09 } 10 11 @Override 12 public void postStop() { 13 System.out.println("postStop hashcode:" + this.hashCode()); 14 } 15 16 @Override 17 public void postRestart(Throwable reason) throws Exception { 18 super.postRestart(reason); 19 System.out.println("postRestart hashcode:" + this.hashCode()); 20 } 21 22 @Override 23 public void preRestart(Throwable reason,Option opt) throws Exception { 24 System.out.println("preRestart hashcode:" + this.hashCode()); 25 } 26 27 @Override 28 public void onReceive(Object msg) { 29 if (msg == Msg.DONE) { 30 getContext().stop(getSelf()); 31 } else if (msg == Msg.RESTART) { 32 System.out.println(((Object)null).toString()); 33 //抛出异常 默认会被restart,但这里会resume 34 double a = 0 / 0; 35 } 36 unhandled(msg); 37 } 38 }

第6~25行,定义了一些Actor的生命周期的回调接口。目的是更好地观察Actor的活动情况。在第32~34行模拟了一些异常情况,第32行会抛出NullPointerException,而第34行因为除以零,所以会抛出ArithmeticException。

主函数如下定义:

01 public static void customStrategy(ActorSystem system){
02   ActorRef a = system.actorOf(Props.create(Supervisor.class), "Supervisor");
03   a.tell(Props.create(RestartActor.class), ActorRef.noSender());
04
05   ActorSelection
sel=system.actorSelection("akka://lifecycle/user/Supervisor/restartActor");
06
07   for(int i=0;i<100;i++){
08     sel.tell(RestartActor.Msg.RESTART, ActorRef.noSender());
09   }
10 }
11 public static void main(String[] args) {
12   ActorSystem system = ActorSystem.create("lifecycle",
ConfigFactory.load("lifecycle.conf"));
13   customStrategy(system);
14 }

上述代码中,第12行代码创建了全局ActorSystem,接着在customStrategy()函数中创建了Supervisor Actor,并且对Supervisor发送一个RestartActor的Props(第3行,这个消息会使得Supervisor创建RestartActor)。

接着,选中RestartActor实例(第5行)。第7~9行,向这个RestartActor发送100条RESTART消息。这会使得RestartActor抛出NullPointerException。

执行上述代码,部分输出如下(由于输出太多,这里只截取重要的部分):

01 preStart hashcode:7302437
02 meet NullPointerException,restart
03 preRestart hashcode:7302437
04 [ERROR] [lifecycle-akka.actor.default-dispatcher-3] [akka://lifecycle/user/Supervisor/
restartActor] null
05 java.lang.NullPointerException
06   at geym.akka.demo.lifecycle.RestartActor.onReceive(RestartActor.java:46)
07   at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
08   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
09   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
10   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
11   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
12   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
13   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
14   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
15   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
16   at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
17   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
18   at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
19
20 preStart hashcode:23269863
21 postRestart hashcode:23269863
22 meet NullPointerException,restart
23 preRestart hashcode:23269863
24 preStart hashcode:24918371
25 postRestart hashcode:24918371
26 meet NullPointerException,restart
27 preRestart hashcode:24918371
28 preStart hashcode:12844205
29 postRestart hashcode:12844205
30 [ERROR] [lifecycle-akka.actor.default-dispatcher-2]
[akka://lifecycle/user/Supervisor/restartActor] n ull
31 meet NullPointerException,restart
32 .....
33 postStop hashcode:12844205

第1行的preStart表示RestartActor正在初始化,注意它的HashCode为7302437。接着,这个Actor遇到了NullPointerException。根据自定义的策略,这将导致它重启,因此,这就有了第3行的preRestart,因为preRestart在正式重启之前调用,因此HashCode还是7302437,表示当前Actor和上一个Actor还是同一个实例。接着,第4~19行打印了异常信息。

第20行进入了preStart()方法,它的HashCode为23269863。这说明系统已经为这个RestartActor生成了一个新的实例,原有的实例因为重启而被回收。新的实例将代替原有实例继续工作。这说明同一个RestartActor在系统的工作始终,未必能保持同一个实例。重启完成后,调用postRestart()方法(第21行)。实际上,Actor重启后的preStart()方法,就是在postRestart()中调用的(Actor父类的postRestart()会调用preStart()方法)。

在经过3次重启后,超过了监督策略中的单位时间内的重试上限。因此,系统不会再进行尝试,而是直接关闭RestartActor。上述输出中第33行就显示了这个过程,在最后一个RestartActor实例上,执行了停止方法。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文