7.5 监督策略
如果一个Actor在执行过程中发生意外,比如没有处理某些异常,导致出错,那么这个时候应该怎么办呢?系统是应该当做什么都没发生过,继续执行,还是认为遇到了一个系统性的错误而重启Actor甚至是它所有的兄弟Actor呢?
对于这种情况,Akka框架给予了我们足够的控制权。在Akka框架内,父Actor可以对子Actor进行监督,监控Actor的行为是否有异常。大体上,监督策略可以分为两种:一种是OneForOneStrategy的监督,另外一种是AllForOneStrategy。
对于OneForOneStrategy的策略,父Actor只会对出问题的子Actor进行处理,比如重启或者停止,而对于AllForOneStrategy,父Actor会对出问题的子Actor以及它所有的兄弟都进行处理。很显然,对于AllForOneStrategy策略,它更加适合于各个Actor联系非常紧密的场景,如果多个Actor间只要有一个Actor出现故障,则宣告整个任务失败,就比较适合使用AllForOneStrategy,否则,在更多的场景中,应该使用OneForOneStrategy。当然了,OneForOneStrategy也是Akka的默认策略。
在一个指定的策略中,我们可以对Actor的失败情况进行相应的处理,比如:当失败时,我们可以无视这个错误,继续执行Actor,就像什么事都没发生过一样。或者可以重启这个Actor,甚至可以让这个Actor彻底停止工作。要指定这些监督行为,只要构造一个自定义的监督策略即可。
下面让我们简单看一下SupervisorStrategy的使用和设置。首先,需要定一个父Actor,它作为所有子Actor的监督者:
01 public class Supervisor extends UntypedActor { 02 private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create(1, TimeUnit.MINUTES), 03 new Function<Throwable, Directive>() { 04 @Override 05 public Directive apply(Throwable t) { 06 if (t instanceof ArithmeticException) { 07 System.out.println("meet ArithmeticException,just resume"); 08 return SupervisorStrategy.resume(); 09 } else if (t instanceof NullPointerException) { 10 System.out.println("meet NullPointerException,restart"); 11 return SupervisorStrategy.restart(); 12 } else if (t instanceof IllegalArgumentException) { 13 return SupervisorStrategy.stop(); 14 } else { 15 return SupervisorStrategy.escalate(); 16 } 17 } 18 }); 19 20 @Override 21 public SupervisorStrategy supervisorStrategy() { 22 return strategy; 23 } 24 25 public void onReceive(Object o) { 26 if (o instanceof Props) { 27 getContext().actorOf((Props) o,"restartActor"); 28 } else { 29 unhandled(o); 30 } 31 } 32 }
上述代码第2~18行,定义了一个OneForOneStrategy的监督策略。在这个监督策略中,运行Actor在遇到错误后,在1分钟内进行3次重试。如果超过这个频率,那么就会直接杀死Actor。具体的策略由第5~16行定义。这里的含义是,当遇到ArithmeticException异常时(比如除以0的错误),继续指定这个Actor,不做任何处理(第8行);当遇到空指针时,进行Actor的重启(第11行)。如果遇到IllegalArgumentException异常,则直接停止Actor(第13行)。对于在这个函数中没有涉及的异常,则向上抛出,由更顶层的Actor处理(第15行)。
第20~23行覆盖父类的supervisorStrategy()方法,设置使用自定义的监督策略。
第27行用来新建一个名为restartActor的子Actor,这个子Actor就由当前的Supervisor进行监督了。当Supervisor接收一个Props对象时,就会根据这个Props配置生成一个restartActor。
RestartActor的实现如下:
01 public class RestartActor extends UntypedActor { 02 public enum Msg { 03 DONE, RESTART 04 } 05 06 @Override 07 public void preStart() { 08 System.out.println("preStart hashcode:" + this.hashCode()); 09 } 10 11 @Override 12 public void postStop() { 13 System.out.println("postStop hashcode:" + this.hashCode()); 14 } 15 16 @Override 17 public void postRestart(Throwable reason) throws Exception { 18 super.postRestart(reason); 19 System.out.println("postRestart hashcode:" + this.hashCode()); 20 } 21 22 @Override 23 public void preRestart(Throwable reason,Option opt) throws Exception { 24 System.out.println("preRestart hashcode:" + this.hashCode()); 25 } 26 27 @Override 28 public void onReceive(Object msg) { 29 if (msg == Msg.DONE) { 30 getContext().stop(getSelf()); 31 } else if (msg == Msg.RESTART) { 32 System.out.println(((Object)null).toString()); 33 //抛出异常 默认会被restart,但这里会resume 34 double a = 0 / 0; 35 } 36 unhandled(msg); 37 } 38 }
第6~25行,定义了一些Actor的生命周期的回调接口。目的是更好地观察Actor的活动情况。在第32~34行模拟了一些异常情况,第32行会抛出NullPointerException,而第34行因为除以零,所以会抛出ArithmeticException。
主函数如下定义:
01 public static void customStrategy(ActorSystem system){ 02 ActorRef a = system.actorOf(Props.create(Supervisor.class), "Supervisor"); 03 a.tell(Props.create(RestartActor.class), ActorRef.noSender()); 04 05 ActorSelection sel=system.actorSelection("akka://lifecycle/user/Supervisor/restartActor"); 06 07 for(int i=0;i<100;i++){ 08 sel.tell(RestartActor.Msg.RESTART, ActorRef.noSender()); 09 } 10 } 11 public static void main(String[] args) { 12 ActorSystem system = ActorSystem.create("lifecycle", ConfigFactory.load("lifecycle.conf")); 13 customStrategy(system); 14 }
上述代码中,第12行代码创建了全局ActorSystem,接着在customStrategy()函数中创建了Supervisor Actor,并且对Supervisor发送一个RestartActor的Props(第3行,这个消息会使得Supervisor创建RestartActor)。
接着,选中RestartActor实例(第5行)。第7~9行,向这个RestartActor发送100条RESTART消息。这会使得RestartActor抛出NullPointerException。
执行上述代码,部分输出如下(由于输出太多,这里只截取重要的部分):
01 preStart hashcode:7302437 02 meet NullPointerException,restart 03 preRestart hashcode:7302437 04 [ERROR] [lifecycle-akka.actor.default-dispatcher-3] [akka://lifecycle/user/Supervisor/ restartActor] null 05 java.lang.NullPointerException 06 at geym.akka.demo.lifecycle.RestartActor.onReceive(RestartActor.java:46) 07 at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) 08 at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 09 at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) 10 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 11 at akka.actor.ActorCell.invoke(ActorCell.scala:487) 12 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 13 at akka.dispatch.Mailbox.run(Mailbox.scala:221) 14 at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 15 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 16 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 17 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 18 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 19 20 preStart hashcode:23269863 21 postRestart hashcode:23269863 22 meet NullPointerException,restart 23 preRestart hashcode:23269863 24 preStart hashcode:24918371 25 postRestart hashcode:24918371 26 meet NullPointerException,restart 27 preRestart hashcode:24918371 28 preStart hashcode:12844205 29 postRestart hashcode:12844205 30 [ERROR] [lifecycle-akka.actor.default-dispatcher-2] [akka://lifecycle/user/Supervisor/restartActor] n ull 31 meet NullPointerException,restart 32 ..... 33 postStop hashcode:12844205
第1行的preStart表示RestartActor正在初始化,注意它的HashCode为7302437。接着,这个Actor遇到了NullPointerException。根据自定义的策略,这将导致它重启,因此,这就有了第3行的preRestart,因为preRestart在正式重启之前调用,因此HashCode还是7302437,表示当前Actor和上一个Actor还是同一个实例。接着,第4~19行打印了异常信息。
第20行进入了preStart()方法,它的HashCode为23269863。这说明系统已经为这个RestartActor生成了一个新的实例,原有的实例因为重启而被回收。新的实例将代替原有实例继续工作。这说明同一个RestartActor在系统的工作始终,未必能保持同一个实例。重启完成后,调用postRestart()方法(第21行)。实际上,Actor重启后的preStart()方法,就是在postRestart()中调用的(Actor父类的postRestart()会调用preStart()方法)。
在经过3次重启后,超过了监督策略中的单位时间内的重试上限。因此,系统不会再进行尝试,而是直接关闭RestartActor。上述输出中第33行就显示了这个过程,在最后一个RestartActor实例上,执行了停止方法。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论