7.12 像数据库一样操作内存数据:软件事务内存
在一些函数式编程语言中,支持一种叫做软件事务内存(STM)的技术。什么是软件事务内存呢?这里的事务和数据库中所说的事务非常类似,具有隔离性、原子性和一致性。与数据库事务不同的是,内存事务不具备持久性(很显然内存数据不会保存下来)。
在很多场合,某一项工作可能要由多个Actor协作完成。在这种协作事务中,如果一个Actor处理失败,那么根据事务的原子性,其他Actor所进行的操作必须要回滚。下面,就让我们来看一个简单的案例。
假设有一个公司要给他的员工发放福利,公司账户里有100元。每次,公司账户会给员工账户转一笔钱,假设转账10元,那么公司账户中应该减去10元,同时,员工账户中应该增加10元。这两个操作必须同时完成,或者同时不完成。
首先,让我们看一下主函数中是如何启动一个内存事务的:
01 public class STMDemo { 02 public static ActorRef company=null; 03 public static ActorRef employee=null; 04 05 public static void main(String[] args) throws Exception { 06 final ActorSystem system = ActorSystem.create("transactionDemo", ConfigFactory.load ("samplehello.conf")); 07 company=system.actorOf(Props.create(CompanyActor.class), "company"); 08 employee=system.actorOf(Props.create(EmployeeActor.class), "employee"); 09 10 Timeout timeout = new Timeout(1, TimeUnit.SECONDS); 11 12 for(int i=1;i<20;i++){ 13 company.tell(new Coordinated(i, timeout), ActorRef.noSender()); 14 Thread.sleep(200); 15 Integer companyCount = (Integer) Await.result( 16 ask(company, "GetCount", timeout), timeout.duration()); 17 Integer employeeCount = (Integer) Await.result( 18 ask(employee, "GetCount", timeout), timeout.duration()); 19 20 System.out.println("company count="+companyCount); 21 System.out.println("employee count="+employeeCount); 22 System.out.println("================="); 23 } 24 } 25 }
上述代码中CompanyActor和EmployeeActor分别用于管理公司账户和雇员账户。在第12~23行中,我们尝试进行19次汇款,第一次汇款额度为1元,第二次为2元,依此类推,最后一笔汇款为19元。
在第13行,新建一个Coordinated协调者,并且将这个协调者当做消息发送给company。当company收到这个协调者消息后,自动成为这个事务的第一个成员。
第15~18行询问公司账户和雇员账户的当前余额,并在第20~21行进行输出。
下面是代表公司账户的Actor:
01 public class CompanyActor extends UntypedActor { 02 private Ref.View<Integer> count = STM.newRef(100); 03 04 @Override 05 public void onReceive(Object msg) { 06 if (msg instanceof Coordinated) { 07 final Coordinated c=(Coordinated)msg; 08 final int downCount=(Integer)c.getMessage(); 09 STMDemo.employee.tell(c.coordinate(downCount), getSelf()); 10 try{ 11 c.atomic(new Runnable() { 12 @Override 13 public void run() { 14 if(count.get()<downCount){ 15 throw new RuntimeException("less than "+downCount); 16 } 17 STM.increment(count, -downCount); 18 } 19 }); 20 }catch(Exception e){ 21 e.printStackTrace(); 22 } 23 24 }else if ("GetCount".equals(msg)) { 25 getSender().tell(count.get(), getSelf()); 26 }else{ 27 unhandled(msg); 28 } 29 } 30 }
在CompanyActor中,首先判断接收的msg是否是Coordinated。如果是Coordinated,则表示这是一个新事务的开始。在第8行,获得事务的参数也就是需要转账的金额。接着在第9行,将调用Coordinated.coordinate()方法,将employee也加入到当前事务中,这样这个事务中就有两个参与者了。
第11行,调用了Coordinated.atomic()定义了原子执行块作为这个事务的一部分。在这个执行块中,对公司账户进行余额调整(第17行)。但是当汇款额度大于可用余额时,就会抛出异常,宣告失败。
第25行用于处理GetCount消息,返回当前账户余额。
作为转账接收方的雇员账户如下:
01 public class EmployeeActor extends UntypedActor { 02 private Ref.View<Integer> count = STM.newRef(50); 03 04 @Override 05 public void onReceive(Object msg) { 06 if (msg instanceof Coordinated) { 07 final Coordinated c = (Coordinated) msg; 08 final int downCount = (Integer) c.getMessage(); 09 try { 10 c.atomic(new Runnable() { 11 @Override 12 public void run() { 13 STM.increment(count, downCount); 14 } 15 }); 16 } catch (Exception e) { 17 } 18 } else if ("GetCount".equals(msg)) { 19 getSender().tell(count.get(), getSelf()); 20 } else { 21 unhandled(msg); 22 } 23 } 24 }
上述代码第2行,设置雇员账户初始金额是50元。第6行,判断消息是否为Coordinated,如果是Coordinated,则当前Actor会自动加入Coordinated指定的事务。第10行,定义原子操作,在这个操作中将修改雇员账户余额。在这里,我们并没有给出异常情况的判断,只要接收到转入金额,一律将其增加到雇员账户中。
大家可能就会产生疑问,如果在公司账户中由于余额不足而导致转账失败了,那在这个雇员账户中不还是正常增加了金额吗?那岂不是钱多出来了?
不过这个担心是完全多余的。因为在这里,两个Actor都已经加入到同一个协调事务Coordinated中了,因此当公司账户出现异常后,雇员账户的余额就会回滚。
执行上述程序,部分输出如下:
..... company count=85 employee count=65 ================= java.lang.RuntimeException: less than 14 company count=9 employee count=141 .... ================= java.lang.RuntimeException: less than 19 省略堆栈信息 实在太多了 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) company count=9 employee count=141 =================
可以看到,无论转账操作是否成功,公司账户和雇员账户的金额总是一致的。当转账失败时,雇员账户的余额并不会增加。这就是软件事务内存的作用。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论