返回介绍

6.5 增强的 Future:CompletableFuture

发布于 2024-08-21 22:20:21 字数 7096 浏览 0 评论 0 收藏 0

CompletableFuture是Java 8新增的一个超大型工具类。为什么说它大呢?因为一方面,它实现了Future接口,而更重要的是,它也实现了CompletionStage接口。CompletionStage接口也是在Java 8中新增的。而CompletionStage接口拥有多达约40种方法!是的,你没有看错,这看起来完全不符合设计原则中所谓的“单方法接口”,但是在这里,它就这么存在了。这个接口之所以拥有如此众多的方法,是为了函数式编程中的流式调用准备的。通过CompletionStage提供的接口,我们可以在一个执行结果上进行多次流式调用,以此可以得到最终结果。比如,你可以在一个CompletionStage上进行如下调用:

stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() ->
System.out.println())

这一连串的调用就会挨个执行。

6.5.1 完成了就通知我

CompletableFuture和Future一样,可以作为函数调用的契约。如果你向CompletableFuture请求一个数据,如果数据还没有准备好,请求线程就会等待。而让人惊喜的是,通过CompletableFuture,我们可以手动设置CompletableFuture的完成状态。

01 public static class AskThread implements Runnable {
02   CompletableFuture<Integer> re = null;
03
04   public AskThread(CompletableFuture<Integer> re) {
05     this.re = re;
06   }
07
08   @Override
09   public void run() {
10     int myRe = 0;
11     try {
12       myRe = re.get() * re.get();
13     } catch (Exception e) {
14     }
15     System.out.println(myRe);
16   }
17 }
18
19 public static void main(String[] args) throws InterruptedException {
20   final CompletableFuture<Integer> future = new CompletableFuture<>();
21   new Thread(new AskThread(future)).start();
22   // 模拟长时间的计算过程
23   Thread.sleep(1000);
24   // 告知完成结果
25   future.complete(60);
26 }

上述代码在第1~17行,定义了一个AskThread线程。它接收一个CompletableFuture作为其构造函数,它的任务是计算CompletableFuture表示的数字的平方,并将其打印。

代码第20行,我们创建一个CompletableFuture对象实例,第21行,我们将这个对象实例传递给这个AskThread线程,并启动这个线程。此时,AskThread在执行到第12行代码时会阻塞,因为CompletableFuture中根本没有它所需要的数据,整个CompletableFuture处于未完成状态。第23行用于模拟长时间的计算过程。当计算完成后,可以将最终数据载入CompletableFuture,并标记为完成状态(第25行)。

当第25行代码执行后,表示CompletableFuture已经完成,因此AskThread就可以继续执行了。

6.5.2 异步执行任务

通过CompletableFuture提供的进一步封装,我们很容易实现Future模式那样的异步调用。比如:

01 public static Integer calc(Integer para) {
02   try {
03     // 模拟一个长时间的执行
04     Thread.sleep(1000);
05   } catch (InterruptedException e) {
06   }
07   return para*para;
08 }
09
10 public static void main(String[] args) throws InterruptedException, ExecutionException
{
11   final CompletableFuture<Integer> future =
12       CompletableFuture.supplyAsync(() -> calc(50));
13   System.out.println(future.get());
14 }

上述代码中,第11~12行使用CompletableFuture.supplyAsync()方法构造一个CompletableFuture实例,在supplyAsync()函数中,它会在一个新的线程中,执行传入的参数。在这里,它会执行calc()方法。而calc()方法的执行可能是比较慢的,但是这不影响CompletableFuture实例的构造速度,因此supplyAsync()会立即返回,它返回的CompletableFuture对象实例就可以作为这次调用的契约,在将来任何场合,用于获得最终的计算结果。代码第13行,试图获得calc()的计算结果,如果当前计算没有完成,则调用get()方法的线程就会等待。

在CompletableFuture中,类似的工厂方法有以下几个:

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

其中supplyAsync()方法用于那些需要有返回值的场景,比如计算某个数据等。而runAsync()方法用于没有返回值的场景,比如,仅仅是简单地执行某一个异步动作。

在这两对方法中,都有一个方法可以接收一个Executor参数。这就使我们可以让Supplier <U>或者Runnable在指定的线程池中工作。如果不指定,则在默认的系统公共的ForkJoinPool.common线程池中执行。

注意:在Java 8中,新增了ForkJoinPool.commonPool()方法。它可以获得一个公共的ForkJoin线程池。这个公共线程池中的所有线程都是Daemon线程。这意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。

6.5.3 流式调用

在前文中我已经简单的提到,CompletionStage的约40个接口是为函数式编程做准备的。在这里,就让我们看一下,如何使用这些接口进行函数式的流式API调用:

01 public static Integer calc(Integer para) {
02   try {
03     // 模拟一个长时间的执行
04     Thread.sleep(1000);
05   } catch (InterruptedException e) {
06   }
07   return para*para;
08 }
09
10 public static void main(String[] args) throws InterruptedException, ExecutionException
{
11   CompletableFuture<Void> fu=CompletableFuture.supplyAsync(() -> calc(50))
12     .thenApply((i)->Integer.toString(i))
13     .thenApply((str)->"\""+str+"\"")
14     .thenAccept(System.out::println);
15   fu.get();
16 }

上述代码中,使用supplyAsync()函数执行一个异步任务。接着连续使用流式调用对任务的处理结果进行再加工,直到最后的结果输出。

这里,我们在第15行执行CompletableFuture.get()方法,目的是等待calc()函数执行完成。如果不进行这个等待调用,由于CompletableFuture异步执行的缘故,主函数不等calc()方法执行完毕就会退出,随着主线程的结束,所有的Daemon线程都会立即退出,从而导致calc()方法无法正常完成。

6.5.4 CompletableFuture中的异常处理

如果CompletableFuture在执行过程中遇到异常,我们可以用函数式编程的风格来优雅地处理这些异常。CompletableFuture提供了一个异常处理方法exceptionally():

01 public static Integer calc(Integer para) {
02   return para / 0;
03 }
04
05 public static void main(String[] args) throws InterruptedException,ExecutionException {
06   CompletableFuture<Void> fu = CompletableFuture
07       .supplyAsync(() -> calc(50))
08       .exceptionally(ex -> {
09         System.out.println(ex.toString());
10         return 0;
11       })
12       .thenApply((i) -> Integer.toString(i))
13       .thenApply((str) -> "\"" + str + "\"")
14       .thenAccept(System.out::println);
15   fu.get();
16 }

在上述代码中,第8行对当前的CompletableFuture进行异常处理。如果没有异常发生,则CompletableFuture就会返回原有的结果。如果遇到了异常,就可以在exceptionally()中处理异常,并返回一个默认的值。在上例中,我们忽略了异常堆栈,只是简单地打印异常的信息。

执行上述函数,我们将得到输出:

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
"0"

6.5.5 组合多个CompletableFuture

CompletableFuture还允许你将多个CompletableFuture进行组合。一种方法是使用thenCompose(),它的签名如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends
CompletionStage<U>> fn)

一个CompletableFuture可以在执行完成后,将执行结果通过Function传递给下一个CompletionStage进行处理(Function接口返回新的CompletionStage实例):

01 public static Integer calc(Integer para) {
02   return para/2;
03 }
04
05 public static void main(String[] args) throws InterruptedException, ExecutionException {
06   CompletableFuture<Void> fu =
07       CompletableFuture.supplyAsync(() -> calc(50))
08       .thenCompose((i)->CompletableFuture.supplyAsync(() -> calc(i)))
09       .thenApply((str)->"\"" + str + "\"").thenAccept(System.out::println);
10   fu.get();
11 }

上述代码第8行,将处理后的结果传递给thenCompose(),并进一步传递给后续新生成的CompletableFuture实例。以上代码的输出如下:

"12"

另外一种组合多个CompletableFuture的方法是thenCombine(),它的签名如下:

public <U,V> CompletableFuture<V> thenCombine
    (CompletionStage<? extends U> other,
     BiFunction<? super T,? super U,? extends V> fn)

方法thenCombine()首先完成当前CompletableFuture和other的执行。接着,将这两者的执行结果传递给BiFunction(该接口接收两个参数,并有一个返回值),并返回代表BiFunction实例的CompletableFuture对象:

01 public static Integer calc(Integer para) {
02   return para / 2;
03 }
04
05 public static void main(String[] args) throws InterruptedException,ExecutionException {
06   CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> calc(50));
07   CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> calc(25));
08
09   CompletableFuture<Void> fu = intFuture.thenCombinet(intFuture2, (i, j) -> (i + j))
10       .thenApply((str) -> "\"" + str + "\"")
11       .thenAccept(System.out::println);
12   fu.get();
13 }

上述代码中,首先生成两个CompletableFuture实例(第6~7行),接着使用thenCombine()组合这两个CompletableFuture,将两者的执行结果进行累加(由第9行的(i, j) -> (i + j)实现),并将其累加结果转为字符串,并输出。上述代码的输出是:

"37"

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文