Java Promise 是 Promise A+ 规范的 Java 实现版本
Promise A+ 是 CommonJS 规范提出的一种异步编程解决方案,比传统的解决方案—回调函数和事件—更合理和更强大。promise 实现了 Promise A+ 规范,包装了 java 中对多线程的操作,提供统一的接口,使得控制异步操作更加容易。实现过程中参考文档如下:
基本使用:修改 pom.xml
<repositories>
<repository>
<id>wjj-maven-repo</id>
<url>https://raw.github.com/zhanyingf15/maven-repo/master</url>
</repository>
</repositories>
<dependency>
<groupId>com.wjj</groupId>
<artifactId>promise</artifactId>
<version>1.0.0</version>
</dependency>
如果maven settings.xml使用了mirror配置,修改mirrorOf
<mirror>
<id>nexus</id>
<mirrorOf>*,!wjj-maven-repo</mirrorOf>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
IPromise promise = new Promise.Builder().promiseHanler(new PromiseHandler() {
@Override
public Object run(PromiseExecutor executor) throws Exception {
return 2*3;
}
}).build();
上面的例子中创建了一个 Promise 对象,指定 PromiseHandler 实现,在run方法中写具体的业务逻辑,类似于Runable的run方法。promise对象一经创建,将立即异步执行。推荐使用lambda表达式,更加简洁。
IPromise promise = new Promise.Builder().promiseHanler(executor -> {
return 2*3;
}).build();
获取promise的执行结果通常使用两个方法then
和listen
,前者是阻塞的后者是非阻塞的。then方法返回一个新的promise对象,因此支持链式调用。
new Promise.Builder().promiseHanler(executor -> {//promise0
return 2*3;
}).build().then(resolvedData -> {//返回一个新的promise1
System.out.println(resolvedData);
return (Integer)resolvedData+1;
}).then(res2->{
System.out.println(res2);
//创建一个新的promise2并返回
return new Promise.Builder().externalInput(res2).promiseHanler(executor -> {
return (Integer)executor.getExternalInput()+2;
}).build();
}).then(res3->{
System.out.println(res3);
return res3;
});
从上面可以看到promise0、promise1和Promise2是链式调用的,每一次then方法都返回一个新的promise。在then方法的回调中,如果返回的是一个非promise对象,那么promise被认为是一个fulfilled状态的promise,如果返回的是一个promsie实例,那么该实例将会异步执行。
假如需要异步顺序执行a->b-c->d四个线程,调用顺序如下
new PromiseA()
.then(dataA->new PromiseB())//A的回调
.then(dataB->new PromiseC())//B的回调
.then(dataC->new PromiseD())//C的回调
.then(dataD->xxx)//D的回调
.pCatch(error->xxxx)//捕获中间可能产生的异常
Promise 规范
Promise 规范可以参考 Promise A+规范。其中 ES6 Promise对象 在 Promise A+ 规范上做了一些补充。Java Promise 在使用上基本与 ES6 Promise 对象保持一致,部分地方有些许不同。 Promise 的三个状态
- pending:等待态,对应线程未执行或执行中
- fulfilled:完成态,对应线程正常执行完毕,其执行结果称为终值
- rejected:拒绝态,对应线程异常结束,其异常原因称为拒因
状态转移只能由pending->fulfilled或pending->rejected,状态一旦发生转移无法再次改变。
使用
IPromise promise = new Promise.Builder().promiseHandler(handler->2*3).build();//mark1
promise.then(resolvedData -> {
System.out.println(resolvedData);
return null;
});
创建一个线程非常简单,mark1标注的行创建一个IPromise实例promise,并指定异步逻辑,这里简单地做了个乘法操作。promise实例一经创建,异步逻辑将立即执行,执行结果或执行中抛出的异常将保存在promise实例中。可以在创建promise实例时指定一个线程池。
ExecutorService pool = Promise.pool(5,10);
IPromise promise = new Promise.Builder().pool(pool).promiseHandler(handler->2*3).build();
上面创建了一个最小为5最大为10的线程池,promise实例对应的线程将被提交的线程池中执行。promise可以通过then或listen方法获取执行结果,then方法是阻塞的而listen是非阻塞的。
通常情况下异步逻辑需要访问外部参数,而外部参数往往并不是final的,promise提供了输入外部参数到内部逻辑的方法externalInput
。
Map<String,String> map = ImmutableMap.of("name","张三");
IPromise promise = new Promise.Builder().externalInput(map).promiseHandler(handler->{
Map<String,String> m = (Map<String,String>)handler.getExternalInput();
return "你好:"+m.get("name");
}).build();
resolve 和 reject
resolve和reject是PromiseExecutor类的方法,resolve方法将promise状态由pending->fulfilled,reject方法将promise状态由pending->rejected。如果promise已经是非pending状态,resolve和reject调用将无效。
new Promise.Builder().promiseHandler(handler->{
int a = 2*3;
handler.resolve(a);
return null;
}).build().listen(((resolvedData, e) -> {
System.out.println(resolvedData);
}));
上面的例子中,计算出a的值,手动将promise状态转移为fulfilled,并将a的值作为promise的终值。同样也可以手动调用handler.reject(e)
将promise状态转为rejected,e(e为Throwable实例)作为promise的据因。
在前面和后面的例子中,并没有显示调用handler.resolve(x)方法,而是return具体的结果。因为在 resolve方法之后return之前程序抛出异常,该异常不会更改promise的状态,异常会被内部吞掉,resolve方法已经将promise的状态修改为fulfilled了。
new Promise.Builder().promiseHandler(handler->{
int a = 2*3;
handler.resolve(a);
throw new RuntimeException("err");
}).build()
.listen(((resolvedData, e) -> {
System.out.println(resolvedData);
System.out.println(e==null);
}));
打印结果
6
true
上面的例子中,手动调用resolve方法后,后续逻辑即便是抛出了异常,e 仍然是 null,因为 Promise 状态已经转变为 fulfilled,后续的所有逻辑(包括return的值)已经跟 Promise 的最终状态无关,后续异常和返回结果将被忽略。因此非特殊情况下不建议直接调用 resolve 方法,而是直接 return 返回执行结果。这种方式是 handler.resolve(x) 的隐式做法,return 的结果将作为 Promise 的终值
异常捕获
Promise 的 rejected 状态对应线程异常结束(运行时异常或手动调用 executor.reject(e)),其据因保存了异常实例,这些异常被 Promise 内部吞掉,并不会抛出到当前运行环境中,所有 try...catch 是无法捕获到promise内部逻辑抛出的异常的。Promise 提供了多种方式可以侦测到内部异常:
then(onFulfilledExecutor,onRejectedExecutor)
API 参考 wiki-thenlisten(onCompleteListener),pFinally(onCompleteListener)
API 参考 wiki-listenpCatch(onCatchedExecutor)
API 参考 wiki-pCatch
then方法的第二个参数是可选参数,如果发生异常,第二个参数回调将被执行。
listen和pFinally行为是一致的,onCompleteListener的listen(Object resolvedData,Throwable e)方法第二参数为异常对象,如果发生异常,e为异常实例,否则为null。
pCatch是推荐使用的异常捕获方式,then和listen对于异常只能“观察”不能修正,在promise链式调用时,一旦发生异常,then方法只能观察到异常发生,但是异常仍会向调用链后方传递,并拒绝后面promise的执行。pCatch不同,它捕获到异常后,可以自行根据业务逻辑对异常处理,继续执行后面的promise链。
new Promise.Builder().promiseHandler(executor -> 3).build().then(resolvedData->{//p1
System.out.println("a:"+resolvedData);
return new Promise.Builder().promiseHandler(executor -> {//p2
executor.reject(new RuntimeException("err"));
return resolvedData;
}).build();
}).then(resolvedData1 -> {//p3
System.out.println("b:"+resolvedData1);
return "b:"+resolvedData1;
},rejectReason -> {
System.err.println("c:"+rejectReason);
}
).then(resolvedData2 -> {//p4
System.out.println("d:"+resolvedData2);
return "d:"+resolvedData2;
},rejectReason -> {
System.err.println("e:"+rejectReason);
}
);
执行结果
a:3
c:java.lang.RuntimeException: err
e:java.lang.RuntimeException: err
在上面的例子中,p1,p2,p3链式调用,p1执行后在p2处手动抛出异常,p2的then侦测到异常,p3,p4的正常逻辑被取消了
new Promise.Builder().promiseHandler(executor -> 3).build().then(resolvedData->{
System.out.println("a:"+resolvedData);
return new Promise.Builder().promiseHandler(executor -> {
executor.reject(new RuntimeException("err"));
return resolvedData;
}).build();
}).pCatch(e->{
System.out.println("捕获到异常");
return 3;
}).then(resolvedData1 -> {
System.out.println("b:"+resolvedData1);
return "b:"+resolvedData1;
},rejectReason -> {
System.err.println("c:"+rejectReason);
}
).then(resolvedData2 -> {
System.out.println("d:"+resolvedData2);
return "d:"+resolvedData2;
},rejectReason -> {
System.err.println("e:"+rejectReason);
}
);
打印结果
a:3
捕获到异常
b:3
d:b:3
pCatch捕获到异常后,返回一个修正值3,这个值会传递个下一个promise处理,继续完成链式调用。pCatch也可以直接返回一个promise,promise的状态决定是否继续后续链的执行(如果pCatch返回的promise是rejected状态仍然会拒绝后续promise的执行直到遇到下一个pCatch)
pCatch可以在promise链的任何位置出现,出现的次数不受限制,如果没有异常出现,将忽略pCatch逻辑。listen和pFinally只能在链的末尾出现,无论异常是否发生,它都将被调用(类似于try...catch...finally)。
Promise 组合
由于开发中无法预计线程什么时候执行结束,有时需要拿到线程执行结果在进行下一步操作就比较麻烦。如果是单个promise,可以简单地使用then方法阻塞当前线程,等待promise线程执行完毕。如果是多个promise并行执行,需要等待所有的promise都执行完毕才能执行下一步,可以使用all或者waitAll方法。
IPromise p1 = new Promise.Builder().promiseHandler(executor -> {
Thread.sleep(1000);
return 1;
}).build();
IPromise p2 = new Promise.Builder().promiseHandler(executor -> {
Thread.sleep(4000);
return 2;
}).build();
IPromise p3 = new Promise.Builder().promiseHandler(executor -> {
Thread.sleep(2000);
return 3;
}).build();
Promise.all(p1,p2,p3).then(resolvedData -> {
Object[] datas = (Object[])resolvedData;
for(Object d:datas){
System.out.println(d);
}
return null;
},e->e.printStackTrace());
上面创建了三个promise,Promise.all将三个promise组装成一个新promise p,新的promise p的状态将由p1-p3的状态决定,如果p1-p3全部正常结束,p的状态是fulfilled,其终值是一个数组,按传入顺序保存p1-p3的执行结果。依据promise规范,如果p1-p3任意一个异常结束或手动调用executor.reject()方法将pn状态转为rejected,p的状态会转为rejected,并尝试取消其余promise。具体可以参考 wiki all。
1.0.1 版本 all 还有一个重载方法 all(ExecutorService threadPool,final IPromise ...promises),可以指定 p 的执行环境,不指定线程池默认新开一个线程。
在有些情况下,当 p1-p3 的其中一个发生异常时,并不希望p的状态立即转变为rejected并尝试取消其余promise的执行,而是希望其余promise继续执行,可以使用waitAll()方法。Promise.waitAll将多个promise组装成一个新promise p,不同于all,p1-p3的状态不会影响p的状态,如果p自身未发生异常(waitAll内部使用了CountDownLatch处理多个线程,可能会有异常),p 的状态一直是 fulfilled,其终值是一个数组,数组值是pn的终值或据因。具体可参考 wiki-waitAll 使用方式如下:
IPromise p1 = new Promise.Builder().promiseHandler(handler->2*3).build();
IPromise p2 = new Promise.Builder().promiseHandler(handler->{
throw new RuntimeException("手动抛出异常");
}).build();
IPromise p = Promise.waitAll(p1,p2).then(resolvedData -> {
Object[] datas = (Object[]) resolvedData;
for(Object d:datas){
if(d instanceof Throwable){
((Throwable)d).printStackTrace();
}else{
System.out.println(d);
}
}
return datas;
});
输出结果
6
java.lang.RuntimeException: 手动抛出异常
p1为正常执行完毕,其终值为6,p2手动抛出异常,使用waitAll后,p的终值为一个数组,遍历数组需要判断值的类型。
类似于all,Promise.race方法将多个 Promise p1,...pn实例,包装成一个新的 Promise 实例 p,只要p1-pn有一个状态发生改变(无论是转变为正常状态还是异常状态),p的状态立即改变,并尝试取消其余promise的执行。第一个改变的promise的状态和终值作为p的状态和终值
Promise.resolve 和 Promise.pTry
这两个方法都是Promise的静态方法。Promise.resolve方法有多个重载,最重要的一个是resolve(Object object,String methodName,List<Object> args)
,该方法是将object的指定方法以异步方式执行,该方法的执行结果作为Promise的终值,具体可参考 wiki Promise.resolve。
pTry方法将object的指定方法以同步方式执行,该方法的执行结果作为Promise的终值,如果object为IPromise实例,将忽略methodName和args参数,异步执行该实例。
该方法是以Promise统一处理同步和异步方法,不管object是同步操作还是异步操作,都可以使用then指定下一步流程,用pCatch方法捕获异常,避免开发中出现以下情况
try{
object.doSomething(args1,args2);//可能会抛出异常
promise.then(resolvedData->{
//一些逻辑
}).then(resolvedData->{
//一些逻辑
}).pCatch(e->{
//异常处理逻辑
})
}catch(Exception e){
//异常处理逻辑
}
使用 pTry,可以简化异常处理
List args = new ArrayList(){args1,args2};
Promise.pTry(object,"doSomething",args)
.then(resolvedData->{
//一些逻辑
}).then(resolvedData->{
//一些逻辑
}).pCatch(e->{
//异常处理逻辑
})
github 地址:https://github.com/zhanyingf15/promise
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论