返回介绍

7.11 多个 Actor 同时修改数据:Agent

发布于 2024-08-21 22:20:21 字数 3474 浏览 0 评论 0 收藏 0

在Actor的编程模型中,Actor之间主要通过消息进行信息传递。因此,很少发生多个Actor需要访问同一个共享变量的情况。但在实际开发中,这种情况很难完全避免。那如果多个Agent需要对同一个共享变量进行读写时,如何保证线程安全呢?

在Akka中,使用一种叫做Agent的组件来实现这个功能。一个Agent提供了对一个变量的异步更新。当一个Actor希望改变Agent的值时,它会向这个Agent下发一个动作(action)。当多个Actor同时改变Agent时,这些action将会在ExecutionContext中被并发调度执行。在任意时刻,一个Agent最多只能执行一个action,对于某一个线程来说,它执行action的顺序与它的发生顺序一致,但对于不同线程来说,这些action可能会交织在一起。

Agent的修改可以使用两个方法send()或者alter()。它们都可以向Agent发送一个修改动作。但是send()方法没有返回值,而alter()方法会返回一个Future对象便于跟踪Agent的执行。

下面让我们模拟这么一个场景:有10个Actor,它们一起对一个Agent执行累加操作,每个agent累加10000次,如果没有意外,那么agent最终的值将是100000,如果Actor间的调度出现问题,那么这个值可能小于100000。

01 public class CounterActor extends UntypedActor {
02   Mapper addMapper = new Mapper<Integer, Integer>() {
03     @Override
04     public Integer apply(Integer i) {
05       return i+1;
06     }
07   };
08
09   @Override
10   public void onReceive(Object msg) {
11     if (msg instanceof Integer) {
12       for (int i = 0; i < 10000; i++) {
13         //我希望能够知道future何时结束
14         Future<Integer> f = AgentDemo.counterAgent.alter(addMapper);
15         AgentDemo.futures.add(f);
16       }
17       getContext().stop(getSelf());
18     } else
19       unhandled(msg);
20   }
21 }

上述代码定义了一个累加的Actor:CounterActor。第2~7行,定义了累计动作action addMapper。它的作用是对Agent的值进行修改,这里简单地加1。

CounterActor的消息处理函数onReceive()中,对全局的counterAgent进行累加操作,alter()指定了累加动作addMapper(第14行)。由于我们希望在将来知道累加行为是否完成,因此在这里将返回的Future对象进行收集(第15行)。完成任务后,Actor自行退出(第17行)。

程序的主函数如下:

01 public class AgentDemo {
02   public static Agent<Integer> counterAgent = Agent.create(0, ExecutionContexts.global());
03   static ConcurrentLinkedQueue<Future<Integer>> futures = new ConcurrentLinkedQueue<Future
<Integer>>();
04
05   public static void main(String[] args) throws InterruptedException {
06     final ActorSystem system = ActorSystem.create("agentdemo",
07         ConfigFactory.load("samplehello.conf"));
08     ActorRef[] counter = new ActorRef[10];
09     for (int i = 0; i < counter.length; i++) {
10       counter[i] = system.actorOf(Props.create(CounterActor.class), "counter_" + i);
11     }
12     final Inbox inbox = Inbox.create(system);
13     for (int i = 0; i < counter.length; i++) {
14       inbox.send(counter[i], 1);
15       inbox.watch(counter[i]);
16     }
17
18     int closeCount = 0;
19     //等待所有Actor全部结束
20     while (true) {
21       Object msg = inbox.receive(Duration.create(1, TimeUnit.SECONDS));
22       if (msg instanceof Terminated) {
23         closeCount++;
24         if (closeCount == counter.length) {
25           break;
26         }
27       } else {
28         System.out.println(msg);
29       }
30     }
31     // 等待所有的累加线程完成,因为他们都是异步的
32     Futures.sequence(futures, system.dispatcher()).onComplete(
33         new OnComplete<Iterable<Integer>>() {
34           @Override
35           public void onComplete(Throwable arg0, Iterable<Integer> arg1) throws Throwable {
36             System.out.println("counterAgent=" + counterAgent.get());
37             system.shutdown();
38           }
39         }, system.dispatcher());
40   }
41 }

上述代码中,第8~11行,创建了10个CounterActor对象。第12~16行,使用Inbox与CounterActor进行通信。第14行的消息将触发CounterActor进行累加操作。第20~30行系统将等待所有10个CounterActor运行结束。执行完成后,我们便已经收集了所有的Future。在第32行,将所有的Future进行串行组合(使用sequence()方法),构造了一个整体的Future,并为它创建onComplete()回调函数。在所有的Agent操作执行完成后,onComplete()方法就会被调用(第35行)。在这个例子中,我们简单地输出最终的counterAgent值(第36行),并关闭系统(第37行)。

执行上述程序,我们将看到:

counterAgent=100000

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文