6.5 增强的 Future:CompletableFuture
CompletableFuture是Java 8新增的一个超大型工具类。为什么说它大呢?因为一方面,它实现了Future接口,而更重要的是,它也实现了CompletionStage接口。CompletionStage接口也是在Java 8中新增的。而CompletionStage接口拥有多达约40种方法!是的,你没有看错,这看起来完全不符合设计原则中所谓的“单方法接口”,但是在这里,它就这么存在了。这个接口之所以拥有如此众多的方法,是为了函数式编程中的流式调用准备的。通过CompletionStage提供的接口,我们可以在一个执行结果上进行多次流式调用,以此可以得到最终结果。比如,你可以在一个CompletionStage上进行如下调用:
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
这一连串的调用就会挨个执行。
6.5.1 完成了就通知我
CompletableFuture和Future一样,可以作为函数调用的契约。如果你向CompletableFuture请求一个数据,如果数据还没有准备好,请求线程就会等待。而让人惊喜的是,通过CompletableFuture,我们可以手动设置CompletableFuture的完成状态。
01 public static class AskThread implements Runnable { 02 CompletableFuture<Integer> re = null; 03 04 public AskThread(CompletableFuture<Integer> re) { 05 this.re = re; 06 } 07 08 @Override 09 public void run() { 10 int myRe = 0; 11 try { 12 myRe = re.get() * re.get(); 13 } catch (Exception e) { 14 } 15 System.out.println(myRe); 16 } 17 } 18 19 public static void main(String[] args) throws InterruptedException { 20 final CompletableFuture<Integer> future = new CompletableFuture<>(); 21 new Thread(new AskThread(future)).start(); 22 // 模拟长时间的计算过程 23 Thread.sleep(1000); 24 // 告知完成结果 25 future.complete(60); 26 }
上述代码在第1~17行,定义了一个AskThread线程。它接收一个CompletableFuture作为其构造函数,它的任务是计算CompletableFuture表示的数字的平方,并将其打印。
代码第20行,我们创建一个CompletableFuture对象实例,第21行,我们将这个对象实例传递给这个AskThread线程,并启动这个线程。此时,AskThread在执行到第12行代码时会阻塞,因为CompletableFuture中根本没有它所需要的数据,整个CompletableFuture处于未完成状态。第23行用于模拟长时间的计算过程。当计算完成后,可以将最终数据载入CompletableFuture,并标记为完成状态(第25行)。
当第25行代码执行后,表示CompletableFuture已经完成,因此AskThread就可以继续执行了。
6.5.2 异步执行任务
通过CompletableFuture提供的进一步封装,我们很容易实现Future模式那样的异步调用。比如:
01 public static Integer calc(Integer para) { 02 try { 03 // 模拟一个长时间的执行 04 Thread.sleep(1000); 05 } catch (InterruptedException e) { 06 } 07 return para*para; 08 } 09 10 public static void main(String[] args) throws InterruptedException, ExecutionException { 11 final CompletableFuture<Integer> future = 12 CompletableFuture.supplyAsync(() -> calc(50)); 13 System.out.println(future.get()); 14 }
上述代码中,第11~12行使用CompletableFuture.supplyAsync()方法构造一个CompletableFuture实例,在supplyAsync()函数中,它会在一个新的线程中,执行传入的参数。在这里,它会执行calc()方法。而calc()方法的执行可能是比较慢的,但是这不影响CompletableFuture实例的构造速度,因此supplyAsync()会立即返回,它返回的CompletableFuture对象实例就可以作为这次调用的契约,在将来任何场合,用于获得最终的计算结果。代码第13行,试图获得calc()的计算结果,如果当前计算没有完成,则调用get()方法的线程就会等待。
在CompletableFuture中,类似的工厂方法有以下几个:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); static CompletableFuture<Void> runAsync(Runnable runnable); static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
其中supplyAsync()方法用于那些需要有返回值的场景,比如计算某个数据等。而runAsync()方法用于没有返回值的场景,比如,仅仅是简单地执行某一个异步动作。
在这两对方法中,都有一个方法可以接收一个Executor参数。这就使我们可以让Supplier <U>或者Runnable在指定的线程池中工作。如果不指定,则在默认的系统公共的ForkJoinPool.common线程池中执行。
注意:在Java 8中,新增了ForkJoinPool.commonPool()方法。它可以获得一个公共的ForkJoin线程池。这个公共线程池中的所有线程都是Daemon线程。这意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。
6.5.3 流式调用
在前文中我已经简单的提到,CompletionStage的约40个接口是为函数式编程做准备的。在这里,就让我们看一下,如何使用这些接口进行函数式的流式API调用:
01 public static Integer calc(Integer para) { 02 try { 03 // 模拟一个长时间的执行 04 Thread.sleep(1000); 05 } catch (InterruptedException e) { 06 } 07 return para*para; 08 } 09 10 public static void main(String[] args) throws InterruptedException, ExecutionException { 11 CompletableFuture<Void> fu=CompletableFuture.supplyAsync(() -> calc(50)) 12 .thenApply((i)->Integer.toString(i)) 13 .thenApply((str)->"\""+str+"\"") 14 .thenAccept(System.out::println); 15 fu.get(); 16 }
上述代码中,使用supplyAsync()函数执行一个异步任务。接着连续使用流式调用对任务的处理结果进行再加工,直到最后的结果输出。
这里,我们在第15行执行CompletableFuture.get()方法,目的是等待calc()函数执行完成。如果不进行这个等待调用,由于CompletableFuture异步执行的缘故,主函数不等calc()方法执行完毕就会退出,随着主线程的结束,所有的Daemon线程都会立即退出,从而导致calc()方法无法正常完成。
6.5.4 CompletableFuture中的异常处理
如果CompletableFuture在执行过程中遇到异常,我们可以用函数式编程的风格来优雅地处理这些异常。CompletableFuture提供了一个异常处理方法exceptionally():
01 public static Integer calc(Integer para) { 02 return para / 0; 03 } 04 05 public static void main(String[] args) throws InterruptedException,ExecutionException { 06 CompletableFuture<Void> fu = CompletableFuture 07 .supplyAsync(() -> calc(50)) 08 .exceptionally(ex -> { 09 System.out.println(ex.toString()); 10 return 0; 11 }) 12 .thenApply((i) -> Integer.toString(i)) 13 .thenApply((str) -> "\"" + str + "\"") 14 .thenAccept(System.out::println); 15 fu.get(); 16 }
在上述代码中,第8行对当前的CompletableFuture进行异常处理。如果没有异常发生,则CompletableFuture就会返回原有的结果。如果遇到了异常,就可以在exceptionally()中处理异常,并返回一个默认的值。在上例中,我们忽略了异常堆栈,只是简单地打印异常的信息。
执行上述函数,我们将得到输出:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero "0"
6.5.5 组合多个CompletableFuture
CompletableFuture还允许你将多个CompletableFuture进行组合。一种方法是使用thenCompose(),它的签名如下:
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
一个CompletableFuture可以在执行完成后,将执行结果通过Function传递给下一个CompletionStage进行处理(Function接口返回新的CompletionStage实例):
01 public static Integer calc(Integer para) { 02 return para/2; 03 } 04 05 public static void main(String[] args) throws InterruptedException, ExecutionException { 06 CompletableFuture<Void> fu = 07 CompletableFuture.supplyAsync(() -> calc(50)) 08 .thenCompose((i)->CompletableFuture.supplyAsync(() -> calc(i))) 09 .thenApply((str)->"\"" + str + "\"").thenAccept(System.out::println); 10 fu.get(); 11 }
上述代码第8行,将处理后的结果传递给thenCompose(),并进一步传递给后续新生成的CompletableFuture实例。以上代码的输出如下:
"12"
另外一种组合多个CompletableFuture的方法是thenCombine(),它的签名如下:
public <U,V> CompletableFuture<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
方法thenCombine()首先完成当前CompletableFuture和other的执行。接着,将这两者的执行结果传递给BiFunction(该接口接收两个参数,并有一个返回值),并返回代表BiFunction实例的CompletableFuture对象:
01 public static Integer calc(Integer para) { 02 return para / 2; 03 } 04 05 public static void main(String[] args) throws InterruptedException,ExecutionException { 06 CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> calc(50)); 07 CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> calc(25)); 08 09 CompletableFuture<Void> fu = intFuture.thenCombinet(intFuture2, (i, j) -> (i + j)) 10 .thenApply((str) -> "\"" + str + "\"") 11 .thenAccept(System.out::println); 12 fu.get(); 13 }
上述代码中,首先生成两个CompletableFuture实例(第6~7行),接着使用thenCombine()组合这两个CompletableFuture,将两者的执行结果进行累加(由第9行的(i, j) -> (i + j)实现),并将其累加结果转为字符串,并输出。上述代码的输出是:
"37"
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论