返回介绍

7.12 像数据库一样操作内存数据:软件事务内存

发布于 2024-08-21 22:20:21 字数 4517 浏览 0 评论 0 收藏 0

在一些函数式编程语言中,支持一种叫做软件事务内存(STM)的技术。什么是软件事务内存呢?这里的事务和数据库中所说的事务非常类似,具有隔离性、原子性和一致性。与数据库事务不同的是,内存事务不具备持久性(很显然内存数据不会保存下来)。

在很多场合,某一项工作可能要由多个Actor协作完成。在这种协作事务中,如果一个Actor处理失败,那么根据事务的原子性,其他Actor所进行的操作必须要回滚。下面,就让我们来看一个简单的案例。

假设有一个公司要给他的员工发放福利,公司账户里有100元。每次,公司账户会给员工账户转一笔钱,假设转账10元,那么公司账户中应该减去10元,同时,员工账户中应该增加10元。这两个操作必须同时完成,或者同时不完成。

首先,让我们看一下主函数中是如何启动一个内存事务的:

01 public class STMDemo {
02   public static ActorRef company=null;
03   public static ActorRef employee=null;
04
05   public static void main(String[] args) throws Exception {
06 final ActorSystem system = ActorSystem.create("transactionDemo", ConfigFactory.load
("samplehello.conf"));
07     company=system.actorOf(Props.create(CompanyActor.class), "company");
08     employee=system.actorOf(Props.create(EmployeeActor.class), "employee");
09
10     Timeout timeout = new Timeout(1, TimeUnit.SECONDS);
11
12     for(int i=1;i<20;i++){
13       company.tell(new Coordinated(i, timeout), ActorRef.noSender());
14       Thread.sleep(200);
15       Integer companyCount = (Integer) Await.result(
16           ask(company, "GetCount", timeout), timeout.duration());
17       Integer employeeCount = (Integer) Await.result(
18           ask(employee, "GetCount", timeout), timeout.duration());
19
20       System.out.println("company count="+companyCount);
21       System.out.println("employee count="+employeeCount);
22       System.out.println("=================");
23     }
24   }
25 }

上述代码中CompanyActor和EmployeeActor分别用于管理公司账户和雇员账户。在第12~23行中,我们尝试进行19次汇款,第一次汇款额度为1元,第二次为2元,依此类推,最后一笔汇款为19元。

在第13行,新建一个Coordinated协调者,并且将这个协调者当做消息发送给company。当company收到这个协调者消息后,自动成为这个事务的第一个成员。

第15~18行询问公司账户和雇员账户的当前余额,并在第20~21行进行输出。

下面是代表公司账户的Actor:

01 public class CompanyActor extends UntypedActor {
02   private Ref.View<Integer> count = STM.newRef(100);
03
04   @Override
05   public void onReceive(Object msg) {
06     if (msg instanceof Coordinated) {
07       final Coordinated c=(Coordinated)msg;
08       final int downCount=(Integer)c.getMessage();
09       STMDemo.employee.tell(c.coordinate(downCount), getSelf());
10       try{
11         c.atomic(new Runnable() {
12           @Override
13           public void run() {
14             if(count.get()<downCount){
15               throw new RuntimeException("less than "+downCount);
16             }
17             STM.increment(count, -downCount);
18           }
19         });
20       }catch(Exception e){
21         e.printStackTrace();
22       }
23
24     }else if ("GetCount".equals(msg)) {
25       getSender().tell(count.get(), getSelf());
26     }else{
27       unhandled(msg);
28     }
29   }
30 }

在CompanyActor中,首先判断接收的msg是否是Coordinated。如果是Coordinated,则表示这是一个新事务的开始。在第8行,获得事务的参数也就是需要转账的金额。接着在第9行,将调用Coordinated.coordinate()方法,将employee也加入到当前事务中,这样这个事务中就有两个参与者了。

第11行,调用了Coordinated.atomic()定义了原子执行块作为这个事务的一部分。在这个执行块中,对公司账户进行余额调整(第17行)。但是当汇款额度大于可用余额时,就会抛出异常,宣告失败。

第25行用于处理GetCount消息,返回当前账户余额。

作为转账接收方的雇员账户如下:

01 public class EmployeeActor extends UntypedActor {
02   private Ref.View<Integer> count = STM.newRef(50);
03
04   @Override
05   public void onReceive(Object msg) {
06     if (msg instanceof Coordinated) {
07       final Coordinated c = (Coordinated) msg;
08       final int downCount = (Integer) c.getMessage();
09       try {
10         c.atomic(new Runnable() {
11           @Override
12           public void run() {
13             STM.increment(count, downCount);
14           }
15         });
16       } catch (Exception e) {
17       }
18     } else if ("GetCount".equals(msg)) {
19       getSender().tell(count.get(), getSelf());
20     } else {
21       unhandled(msg);
22     }
23   }
24 }

上述代码第2行,设置雇员账户初始金额是50元。第6行,判断消息是否为Coordinated,如果是Coordinated,则当前Actor会自动加入Coordinated指定的事务。第10行,定义原子操作,在这个操作中将修改雇员账户余额。在这里,我们并没有给出异常情况的判断,只要接收到转入金额,一律将其增加到雇员账户中。

大家可能就会产生疑问,如果在公司账户中由于余额不足而导致转账失败了,那在这个雇员账户中不还是正常增加了金额吗?那岂不是钱多出来了?

不过这个担心是完全多余的。因为在这里,两个Actor都已经加入到同一个协调事务Coordinated中了,因此当公司账户出现异常后,雇员账户的余额就会回滚。

执行上述程序,部分输出如下:

.....
company count=85
employee count=65
=================
java.lang.RuntimeException: less than 14
company count=9
employee count=141
....
=================
java.lang.RuntimeException: less than 19
  省略堆栈信息 实在太多了
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
company count=9
employee count=141
=================

可以看到,无论转账操作是否成功,公司账户和雇员账户的金额总是一致的。当转账失败时,雇员账户的余额并不会增加。这就是软件事务内存的作用。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文