Java Promise 是 Promise A+ 规范的 Java 实现版本

发布于 2021-01-09 09:57:07 字数 12487 浏览 1371 评论 0

Promise A+ 是 CommonJS 规范提出的一种异步编程解决方案,比传统的解决方案—回调函数和事件—更合理和更强大。promise 实现了 Promise A+ 规范,包装了 java 中对多线程的操作,提供统一的接口,使得控制异步操作更加容易。实现过程中参考文档如下:

基本使用:修改 pom.xml

<repositories>
  <repository>
    <id>wjj-maven-repo</id>
    <url>https://raw.github.com/zhanyingf15/maven-repo/master</url>
  </repository>
</repositories>
<dependency>
  <groupId>com.wjj</groupId>
  <artifactId>promise</artifactId>
  <version>1.0.0</version>
</dependency>

如果maven settings.xml使用了mirror配置,修改mirrorOf

<mirror>
  <id>nexus</id>
  <mirrorOf>*,!wjj-maven-repo</mirrorOf> 
  <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
IPromise promise = new Promise.Builder().promiseHanler(new PromiseHandler() {
  @Override
  public Object run(PromiseExecutor executor) throws Exception {
    return 2*3;
  }
}).build();

上面的例子中创建了一个 Promise 对象,指定 PromiseHandler 实现,在run方法中写具体的业务逻辑,类似于Runable的run方法。promise对象一经创建,将立即异步执行。推荐使用lambda表达式,更加简洁。

IPromise promise = new Promise.Builder().promiseHanler(executor -> {
  return 2*3;
}).build();

获取promise的执行结果通常使用两个方法thenlisten,前者是阻塞的后者是非阻塞的。then方法返回一个新的promise对象,因此支持链式调用。

new Promise.Builder().promiseHanler(executor -> {//promise0
  return 2*3;
}).build().then(resolvedData -> {//返回一个新的promise1
  System.out.println(resolvedData);
  return (Integer)resolvedData+1;
}).then(res2->{
  System.out.println(res2);
  //创建一个新的promise2并返回
  return new Promise.Builder().externalInput(res2).promiseHanler(executor -> {
    return (Integer)executor.getExternalInput()+2;
  }).build();
}).then(res3->{
  System.out.println(res3);
  return res3;
});

从上面可以看到promise0、promise1和Promise2是链式调用的,每一次then方法都返回一个新的promise。在then方法的回调中,如果返回的是一个非promise对象,那么promise被认为是一个fulfilled状态的promise,如果返回的是一个promsie实例,那么该实例将会异步执行。
假如需要异步顺序执行a->b-c->d四个线程,调用顺序如下

new PromiseA()
.then(dataA->new PromiseB())//A的回调
.then(dataB->new PromiseC())//B的回调
.then(dataC->new PromiseD())//C的回调
.then(dataD->xxx)//D的回调
.pCatch(error->xxxx)//捕获中间可能产生的异常

Promise 规范

Promise 规范可以参考 Promise A+规范。其中 ES6 Promise对象 在 Promise A+ 规范上做了一些补充。Java Promise 在使用上基本与 ES6 Promise 对象保持一致,部分地方有些许不同。 Promise 的三个状态

  • pending:等待态,对应线程未执行或执行中
  • fulfilled:完成态,对应线程正常执行完毕,其执行结果称为终值
  • rejected:拒绝态,对应线程异常结束,其异常原因称为拒因
    状态转移只能由pending->fulfilled或pending->rejected,状态一旦发生转移无法再次改变。

使用

IPromise promise = new Promise.Builder().promiseHandler(handler->2*3).build();//mark1
promise.then(resolvedData -> {
  System.out.println(resolvedData);
  return null;
});

创建一个线程非常简单,mark1标注的行创建一个IPromise实例promise,并指定异步逻辑,这里简单地做了个乘法操作。promise实例一经创建,异步逻辑将立即执行,执行结果或执行中抛出的异常将保存在promise实例中。可以在创建promise实例时指定一个线程池。

ExecutorService pool = Promise.pool(5,10);
IPromise promise = new Promise.Builder().pool(pool).promiseHandler(handler->2*3).build();

上面创建了一个最小为5最大为10的线程池,promise实例对应的线程将被提交的线程池中执行。promise可以通过then或listen方法获取执行结果,then方法是阻塞的而listen是非阻塞的。
通常情况下异步逻辑需要访问外部参数,而外部参数往往并不是final的,promise提供了输入外部参数到内部逻辑的方法externalInput

Map<String,String> map = ImmutableMap.of("name","张三");
IPromise promise = new Promise.Builder().externalInput(map).promiseHandler(handler->{
  Map<String,String> m = (Map<String,String>)handler.getExternalInput();
  return "你好:"+m.get("name");
}).build();

resolve 和 reject

resolve和reject是PromiseExecutor类的方法,resolve方法将promise状态由pending->fulfilled,reject方法将promise状态由pending->rejected。如果promise已经是非pending状态,resolve和reject调用将无效。

new Promise.Builder().promiseHandler(handler->{
  int a = 2*3;
  handler.resolve(a);
  return null;
}).build().listen(((resolvedData, e) -> {
  System.out.println(resolvedData);
}));

上面的例子中,计算出a的值,手动将promise状态转移为fulfilled,并将a的值作为promise的终值。同样也可以手动调用handler.reject(e)将promise状态转为rejected,e(e为Throwable实例)作为promise的据因。

在前面和后面的例子中,并没有显示调用handler.resolve(x)方法,而是return具体的结果。因为在 resolve方法之后return之前程序抛出异常,该异常不会更改promise的状态,异常会被内部吞掉,resolve方法已经将promise的状态修改为fulfilled了。

new Promise.Builder().promiseHandler(handler->{
  int a = 2*3;
  handler.resolve(a);
  throw new RuntimeException("err");
}).build()
    .listen(((resolvedData, e) -> {
  System.out.println(resolvedData);
  System.out.println(e==null);
}));

打印结果

6
true

上面的例子中,手动调用resolve方法后,后续逻辑即便是抛出了异常,e 仍然是 null,因为 Promise 状态已经转变为 fulfilled,后续的所有逻辑(包括return的值)已经跟 Promise 的最终状态无关,后续异常和返回结果将被忽略。因此非特殊情况下不建议直接调用 resolve 方法,而是直接 return 返回执行结果。这种方式是 handler.resolve(x) 的隐式做法,return 的结果将作为 Promise 的终值

异常捕获

Promise 的 rejected 状态对应线程异常结束(运行时异常或手动调用 executor.reject(e)),其据因保存了异常实例,这些异常被 Promise 内部吞掉,并不会抛出到当前运行环境中,所有 try...catch 是无法捕获到promise内部逻辑抛出的异常的。Promise 提供了多种方式可以侦测到内部异常:

then方法的第二个参数是可选参数,如果发生异常,第二个参数回调将被执行。

listen和pFinally行为是一致的,onCompleteListener的listen(Object resolvedData,Throwable e)方法第二参数为异常对象,如果发生异常,e为异常实例,否则为null。

pCatch是推荐使用的异常捕获方式,then和listen对于异常只能“观察”不能修正,在promise链式调用时,一旦发生异常,then方法只能观察到异常发生,但是异常仍会向调用链后方传递,并拒绝后面promise的执行。pCatch不同,它捕获到异常后,可以自行根据业务逻辑对异常处理,继续执行后面的promise链。

new Promise.Builder().promiseHandler(executor -> 3).build().then(resolvedData->{//p1
  System.out.println("a:"+resolvedData);
  return new Promise.Builder().promiseHandler(executor -> {//p2
    executor.reject(new RuntimeException("err"));
    return resolvedData;
  }).build();
}).then(resolvedData1 -> {//p3
    System.out.println("b:"+resolvedData1);
    return "b:"+resolvedData1;
  },rejectReason -> {
    System.err.println("c:"+rejectReason);
  }
).then(resolvedData2 -> {//p4
    System.out.println("d:"+resolvedData2);
    return "d:"+resolvedData2;
  },rejectReason -> {
    System.err.println("e:"+rejectReason);
  }
);

执行结果

a:3
c:java.lang.RuntimeException: err
e:java.lang.RuntimeException: err

在上面的例子中,p1,p2,p3链式调用,p1执行后在p2处手动抛出异常,p2的then侦测到异常,p3,p4的正常逻辑被取消了

new Promise.Builder().promiseHandler(executor -> 3).build().then(resolvedData->{
  System.out.println("a:"+resolvedData);
  return new Promise.Builder().promiseHandler(executor -> {
    executor.reject(new RuntimeException("err"));
    return resolvedData;
  }).build();
}).pCatch(e->{
  System.out.println("捕获到异常");
  return 3;
}).then(resolvedData1 -> {
    System.out.println("b:"+resolvedData1);
    return "b:"+resolvedData1;
  },rejectReason -> {
    System.err.println("c:"+rejectReason);
  }
).then(resolvedData2 -> {
    System.out.println("d:"+resolvedData2);
    return "d:"+resolvedData2;
  },rejectReason -> {
    System.err.println("e:"+rejectReason);
  }
);

打印结果

a:3
捕获到异常
b:3
d:b:3

pCatch捕获到异常后,返回一个修正值3,这个值会传递个下一个promise处理,继续完成链式调用。pCatch也可以直接返回一个promise,promise的状态决定是否继续后续链的执行(如果pCatch返回的promise是rejected状态仍然会拒绝后续promise的执行直到遇到下一个pCatch)

pCatch可以在promise链的任何位置出现,出现的次数不受限制,如果没有异常出现,将忽略pCatch逻辑。listen和pFinally只能在链的末尾出现,无论异常是否发生,它都将被调用(类似于try...catch...finally)。

Promise 组合

由于开发中无法预计线程什么时候执行结束,有时需要拿到线程执行结果在进行下一步操作就比较麻烦。如果是单个promise,可以简单地使用then方法阻塞当前线程,等待promise线程执行完毕。如果是多个promise并行执行,需要等待所有的promise都执行完毕才能执行下一步,可以使用all或者waitAll方法。

IPromise p1 = new Promise.Builder().promiseHandler(executor -> {
  Thread.sleep(1000);
  return 1;
}).build();
IPromise p2 = new Promise.Builder().promiseHandler(executor -> {
  Thread.sleep(4000);
  return 2;
}).build();
IPromise p3 = new Promise.Builder().promiseHandler(executor -> {
  Thread.sleep(2000);
  return 3;
}).build();
Promise.all(p1,p2,p3).then(resolvedData -> {
  Object[] datas = (Object[])resolvedData;
  for(Object d:datas){
    System.out.println(d);
  }
  return null;
},e->e.printStackTrace());

上面创建了三个promise,Promise.all将三个promise组装成一个新promise p,新的promise p的状态将由p1-p3的状态决定,如果p1-p3全部正常结束,p的状态是fulfilled,其终值是一个数组,按传入顺序保存p1-p3的执行结果。依据promise规范,如果p1-p3任意一个异常结束或手动调用executor.reject()方法将pn状态转为rejected,p的状态会转为rejected,并尝试取消其余promise。具体可以参考 wiki all

1.0.1 版本 all 还有一个重载方法 all(ExecutorService threadPool,final IPromise ...promises),可以指定 p 的执行环境,不指定线程池默认新开一个线程。

在有些情况下,当 p1-p3 的其中一个发生异常时,并不希望p的状态立即转变为rejected并尝试取消其余promise的执行,而是希望其余promise继续执行,可以使用waitAll()方法。Promise.waitAll将多个promise组装成一个新promise p,不同于all,p1-p3的状态不会影响p的状态,如果p自身未发生异常(waitAll内部使用了CountDownLatch处理多个线程,可能会有异常),p 的状态一直是 fulfilled,其终值是一个数组,数组值是pn的终值或据因。具体可参考 wiki-waitAll 使用方式如下:

IPromise p1 = new Promise.Builder().promiseHandler(handler->2*3).build();
IPromise p2 = new Promise.Builder().promiseHandler(handler->{
  throw new RuntimeException("手动抛出异常");
}).build();
IPromise p = Promise.waitAll(p1,p2).then(resolvedData -> {
  Object[] datas = (Object[]) resolvedData;
  for(Object d:datas){
    if(d instanceof Throwable){
      ((Throwable)d).printStackTrace();
    }else{
      System.out.println(d);
    }
  }
  return datas;
});

输出结果

6
java.lang.RuntimeException: 手动抛出异常

p1为正常执行完毕,其终值为6,p2手动抛出异常,使用waitAll后,p的终值为一个数组,遍历数组需要判断值的类型。

类似于all,Promise.race方法将多个 Promise p1,...pn实例,包装成一个新的 Promise 实例 p,只要p1-pn有一个状态发生改变(无论是转变为正常状态还是异常状态),p的状态立即改变,并尝试取消其余promise的执行。第一个改变的promise的状态和终值作为p的状态和终值

Promise.resolve 和 Promise.pTry

这两个方法都是Promise的静态方法。Promise.resolve方法有多个重载,最重要的一个是resolve(Object object,String methodName,List<Object> args),该方法是将object的指定方法以异步方式执行,该方法的执行结果作为Promise的终值,具体可参考 wiki Promise.resolve

pTry方法将object的指定方法以同步方式执行,该方法的执行结果作为Promise的终值,如果object为IPromise实例,将忽略methodName和args参数,异步执行该实例。
该方法是以Promise统一处理同步和异步方法,不管object是同步操作还是异步操作,都可以使用then指定下一步流程,用pCatch方法捕获异常,避免开发中出现以下情况

try{
  object.doSomething(args1,args2);//可能会抛出异常
  promise.then(resolvedData->{
    //一些逻辑
  }).then(resolvedData->{
    //一些逻辑
  }).pCatch(e->{
    //异常处理逻辑
  })
}catch(Exception e){
  //异常处理逻辑
}

使用 pTry,可以简化异常处理

List args = new ArrayList(){args1,args2};
Promise.pTry(object,"doSomething",args)
.then(resolvedData->{
    //一些逻辑
}).then(resolvedData->{
  //一些逻辑
}).pCatch(e->{
  //异常处理逻辑
})

github 地址:https://github.com/zhanyingf15/promise

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

JSmiles

生命进入颠沛而奔忙的本质状态,并将以不断告别和相遇的陈旧方式继续下去。

0 文章
0 评论
84960 人气
更多

推荐作者

遂心如意

文章 0 评论 0

5513090242

文章 0 评论 0

巷雨优美回忆

文章 0 评论 0

junpengz2000

文章 0 评论 0

13郎

文章 0 评论 0

qq_xU4RDg

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文