7.11 多个 Actor 同时修改数据:Agent
在Actor的编程模型中,Actor之间主要通过消息进行信息传递。因此,很少发生多个Actor需要访问同一个共享变量的情况。但在实际开发中,这种情况很难完全避免。那如果多个Agent需要对同一个共享变量进行读写时,如何保证线程安全呢?
在Akka中,使用一种叫做Agent的组件来实现这个功能。一个Agent提供了对一个变量的异步更新。当一个Actor希望改变Agent的值时,它会向这个Agent下发一个动作(action)。当多个Actor同时改变Agent时,这些action将会在ExecutionContext中被并发调度执行。在任意时刻,一个Agent最多只能执行一个action,对于某一个线程来说,它执行action的顺序与它的发生顺序一致,但对于不同线程来说,这些action可能会交织在一起。
Agent的修改可以使用两个方法send()或者alter()。它们都可以向Agent发送一个修改动作。但是send()方法没有返回值,而alter()方法会返回一个Future对象便于跟踪Agent的执行。
下面让我们模拟这么一个场景:有10个Actor,它们一起对一个Agent执行累加操作,每个agent累加10000次,如果没有意外,那么agent最终的值将是100000,如果Actor间的调度出现问题,那么这个值可能小于100000。
01 public class CounterActor extends UntypedActor { 02 Mapper addMapper = new Mapper<Integer, Integer>() { 03 @Override 04 public Integer apply(Integer i) { 05 return i+1; 06 } 07 }; 08 09 @Override 10 public void onReceive(Object msg) { 11 if (msg instanceof Integer) { 12 for (int i = 0; i < 10000; i++) { 13 //我希望能够知道future何时结束 14 Future<Integer> f = AgentDemo.counterAgent.alter(addMapper); 15 AgentDemo.futures.add(f); 16 } 17 getContext().stop(getSelf()); 18 } else 19 unhandled(msg); 20 } 21 }
上述代码定义了一个累加的Actor:CounterActor。第2~7行,定义了累计动作action addMapper。它的作用是对Agent的值进行修改,这里简单地加1。
CounterActor的消息处理函数onReceive()中,对全局的counterAgent进行累加操作,alter()指定了累加动作addMapper(第14行)。由于我们希望在将来知道累加行为是否完成,因此在这里将返回的Future对象进行收集(第15行)。完成任务后,Actor自行退出(第17行)。
程序的主函数如下:
01 public class AgentDemo { 02 public static Agent<Integer> counterAgent = Agent.create(0, ExecutionContexts.global()); 03 static ConcurrentLinkedQueue<Future<Integer>> futures = new ConcurrentLinkedQueue<Future <Integer>>(); 04 05 public static void main(String[] args) throws InterruptedException { 06 final ActorSystem system = ActorSystem.create("agentdemo", 07 ConfigFactory.load("samplehello.conf")); 08 ActorRef[] counter = new ActorRef[10]; 09 for (int i = 0; i < counter.length; i++) { 10 counter[i] = system.actorOf(Props.create(CounterActor.class), "counter_" + i); 11 } 12 final Inbox inbox = Inbox.create(system); 13 for (int i = 0; i < counter.length; i++) { 14 inbox.send(counter[i], 1); 15 inbox.watch(counter[i]); 16 } 17 18 int closeCount = 0; 19 //等待所有Actor全部结束 20 while (true) { 21 Object msg = inbox.receive(Duration.create(1, TimeUnit.SECONDS)); 22 if (msg instanceof Terminated) { 23 closeCount++; 24 if (closeCount == counter.length) { 25 break; 26 } 27 } else { 28 System.out.println(msg); 29 } 30 } 31 // 等待所有的累加线程完成,因为他们都是异步的 32 Futures.sequence(futures, system.dispatcher()).onComplete( 33 new OnComplete<Iterable<Integer>>() { 34 @Override 35 public void onComplete(Throwable arg0, Iterable<Integer> arg1) throws Throwable { 36 System.out.println("counterAgent=" + counterAgent.get()); 37 system.shutdown(); 38 } 39 }, system.dispatcher()); 40 } 41 }
上述代码中,第8~11行,创建了10个CounterActor对象。第12~16行,使用Inbox与CounterActor进行通信。第14行的消息将触发CounterActor进行累加操作。第20~30行系统将等待所有10个CounterActor运行结束。执行完成后,我们便已经收集了所有的Future。在第32行,将所有的Future进行串行组合(使用sequence()方法),构造了一个整体的Future,并为它创建onComplete()回调函数。在所有的Agent操作执行完成后,onComplete()方法就会被调用(第35行)。在这个例子中,我们简单地输出最终的counterAgent值(第36行),并关闭系统(第37行)。
执行上述程序,我们将看到:
counterAgent=100000
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论