了解链式 CompletableFuture 的并行执行

发布于 2025-01-11 20:30:48 字数 2755 浏览 0 评论 0原文

我有一个关于 Java Streams 和链式 CompletableFutures 如何执行的问题。

我的问题是这样的:如果我运行以下代码,则使用列表中的 10 个项目调用 execute() 大约需要 11 秒才能完成(列表中的项目数加 1)。这是因为我有两个线程并行工作:第一个线程执行 digItUp 操作,完成后,第二个线程执行 fillItBackIn 操作,第一个线程开始处理 < code>digItUp 位于列表中的下一项。

如果我注释掉第 36 行 (.collect(Collectors.toList())),则 execute() 方法需要大约 20 秒才能完成。线程不是并行操作的;对于列表中的每个项目,digItUp 操作完成,然后 fillItBackIn 操作按顺序完成,然后再处理列表中的下一个项目。

我不清楚为什么排除 (.collect(Collectors.toList())) 会改变这种行为。有人可以解释一下吗?

完整的类:

package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class SimpleExample {

    private final ExecutorService diggingThreadPool = Executors.newFixedThreadPool(1);
    private final ExecutorService fillingThreadPool = Executors.newFixedThreadPool(1);

    public SimpleExample() {

    }

    public static void main(String[] args) {
        List<Double> holesToDig = new ArrayList<>();
        Random random = new Random();
        for (int c = 0; c < 10; c++) {
            holesToDig.add(random.nextDouble(1000));
        }
        new SimpleExample().execute(holesToDig);
    }

    public void execute(List<Double> holeVolumes) {
        long start = System.currentTimeMillis();
        holeVolumes.stream()
                .map(volume -> {
                    CompletableFuture<Double> digItUpCF = CompletableFuture.supplyAsync(() -> digItUp(volume), diggingThreadPool);
                    return digItUpCF.thenApplyAsync(volumeDugUp -> fillItBackIn(volumeDugUp), fillingThreadPool);
                })
                .collect(Collectors.toList())
                .forEach(cf -> {
                    Double volume = cf.join();
                    System.out.println("Dug a hole and filled it back in.  Net volume: " + volume);
                });
        System.out.println("Dug up and filled back in " + holeVolumes.size() + " holes in " + (System.currentTimeMillis() - start) + " ms");
    }

    public Double digItUp(Double volume) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("Dug hole with volume " + volume);
        return volume;
    }

    public Double fillItBackIn(Double volumeDugUp) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("Filled back in hole of volume " + volumeDugUp);
        return 0.0;
    }
}

I have a question about how Java Streams and chained CompletableFutures perform.

My question is this: if I run the following code, calling execute() with 10 items in the list takes ~11 seconds to complete (number of items in the list plus 1). This is because I have two threads working in parallel: the first executes the digItUp operation, and once that's complete, the second executes the fillItBackIn operation, and the first starts processing digItUp on the next item in the list.

If I comment out line 36 (.collect(Collectors.toList())), the execute() method takes ~20 seconds to complete. The threads do not operate in parallel; for each item in the list, the digItUp operation completes, and then the fillItBackIn operation completes in sequence before the next item in the list is processed.

It's unclear to me why the exclusion of (.collect(Collectors.toList())) should change this behavior. Can someone explain?

The complete class:

package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class SimpleExample {

    private final ExecutorService diggingThreadPool = Executors.newFixedThreadPool(1);
    private final ExecutorService fillingThreadPool = Executors.newFixedThreadPool(1);

    public SimpleExample() {

    }

    public static void main(String[] args) {
        List<Double> holesToDig = new ArrayList<>();
        Random random = new Random();
        for (int c = 0; c < 10; c++) {
            holesToDig.add(random.nextDouble(1000));
        }
        new SimpleExample().execute(holesToDig);
    }

    public void execute(List<Double> holeVolumes) {
        long start = System.currentTimeMillis();
        holeVolumes.stream()
                .map(volume -> {
                    CompletableFuture<Double> digItUpCF = CompletableFuture.supplyAsync(() -> digItUp(volume), diggingThreadPool);
                    return digItUpCF.thenApplyAsync(volumeDugUp -> fillItBackIn(volumeDugUp), fillingThreadPool);
                })
                .collect(Collectors.toList())
                .forEach(cf -> {
                    Double volume = cf.join();
                    System.out.println("Dug a hole and filled it back in.  Net volume: " + volume);
                });
        System.out.println("Dug up and filled back in " + holeVolumes.size() + " holes in " + (System.currentTimeMillis() - start) + " ms");
    }

    public Double digItUp(Double volume) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("Dug hole with volume " + volume);
        return volume;
    }

    public Double fillItBackIn(Double volumeDugUp) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("Filled back in hole of volume " + volumeDugUp);
        return 0.0;
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

甜点 2025-01-18 20:30:48

原因是 collect(Collectors.toList()) 是一个终端操作,因此它会触发流管道(请记住,流是延迟评估的)。因此,当您调用collect时,所有CompletableFuture实例都会被构造并放置在列表中。这意味着存在一条CompletableFuture链,其中每一个又是由两个阶段组成的链,我们称它们为X和Y。

每当第一个线程执行器完成一个X阶段时,它就是空闲地处理下一个组合的CompletableFuture的X阶段,而另一个线程执行器正在处理前一个CompletableFuture的Y阶段。这是我们直观地期望的结果。

另一方面,当您不调用 collect 时,在这种情况下 forEach 就是终端操作。然而,在这种情况下,流中的每个元素都会按顺序处理(为了确认尝试切换到parallelStream()),因此阶段 X 和 Y为第一个 CompletableFuture 执行。只有当前一个流元素的阶段 Y 完成时,forEach 才会移动到流管道中的第二个元素,只有那时才会从原始流映射一个新的 CompletableFuture 值。

The reason is that collect(Collectors.toList()) is a terminal operation, hence it triggers the stream pipeline (remember that streams are evaluated lazily). So when you call collect, all of the CompletableFuture instances are constructed and placed in the list. This means that there is a chain of CompletableFuture, where each one is in turn a chain composed of two stages, let's call them X and Y.

Every time the first thread executor finishes an X stage, it is free to process the X stage of the next composed CompletableFuture, while the other thread executor is processing stage Y of the previous CompletableFuture. This is the result that we intuitively expect.

On the other hand, when you don't call collect, then forEach is in this case the terminal operation. However, in this case every element in the stream is processed sequentially (to confirm try switching to parallelStream()), hence stages X and Y get executed for the first CompletableFuture. Only when stage Y from the previous stream element is finished, will forEach move to the second element in the stream pipeline, and only then will a new CompletableFuture be mapped from the original Double value.

墨离汐 2025-01-18 20:30:48

喜欢这个问题,M A 的回答很棒!我有一个类似的用例,我在那里使用 Rxjava。它的效果非常好,但我的同事向我提出挑战,要求我在没有它的情况下实施它。 TT

我测试了您的示例,并找到了一种解决方法,可以使其在没有 collect 的情况下具有相同的性能。诀窍是让 cf.join() 在另一个线程中执行。

.forEach(cf -> CompletableFuture.supplyAsync(cf::join, anotherThreadpool) 
// another threadpool for the join, or you can omit it, using the default forkjoinpool.commonpool
              .thenAccept(v ->  System.out.println("Dug a hole and filled it back in.  Net volume: " + v))
);

但我不得不说,这可能会导致潜在的问题,因为它缺乏对背压的支持......如果上游无限且快速,但消费者太慢,所有快速创建的 CompletableFuture map算子中的内容会被累加并提交到第一个diggingThreadPool,最终导致RejectedExecutionException、OOM等。

Love this question and M A's answer is awesome! I had a similar use case, and I was using Rxjava there. It worked very well, but my colleagues challenged me to implement it without that. T.T

I tested your example and found a workaround to make it the same performance without collect. The trick is to let the cf.join() be executed in another thread.

.forEach(cf -> CompletableFuture.supplyAsync(cf::join, anotherThreadpool) 
// another threadpool for the join, or you can omit it, using the default forkjoinpool.commonpool
              .thenAccept(v ->  System.out.println("Dug a hole and filled it back in.  Net volume: " + v))
);

But I have to say, this might lead to potential issues as it lacks the support for backpressure...if the upstream is infinite and fast, but the consumer is too slow, all the fast-created CompletableFuture in the map operator would be accumulated and submitted to the first diggingThreadPool, finally causing RejectedExecutionException, OOM, etc.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文