了解链式 CompletableFuture 的并行执行
我有一个关于 Java Streams 和链式 CompletableFutures 如何执行的问题。
我的问题是这样的:如果我运行以下代码,则使用列表中的 10 个项目调用 execute()
大约需要 11 秒才能完成(列表中的项目数加 1)。这是因为我有两个线程并行工作:第一个线程执行 digItUp
操作,完成后,第二个线程执行 fillItBackIn
操作,第一个线程开始处理 < code>digItUp 位于列表中的下一项。
如果我注释掉第 36 行 (.collect(Collectors.toList())
),则 execute()
方法需要大约 20 秒才能完成。线程不是并行操作的;对于列表中的每个项目,digItUp
操作完成,然后 fillItBackIn
操作按顺序完成,然后再处理列表中的下一个项目。
我不清楚为什么排除 (.collect(Collectors.toList())
) 会改变这种行为。有人可以解释一下吗?
完整的类:
package com.test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class SimpleExample {
private final ExecutorService diggingThreadPool = Executors.newFixedThreadPool(1);
private final ExecutorService fillingThreadPool = Executors.newFixedThreadPool(1);
public SimpleExample() {
}
public static void main(String[] args) {
List<Double> holesToDig = new ArrayList<>();
Random random = new Random();
for (int c = 0; c < 10; c++) {
holesToDig.add(random.nextDouble(1000));
}
new SimpleExample().execute(holesToDig);
}
public void execute(List<Double> holeVolumes) {
long start = System.currentTimeMillis();
holeVolumes.stream()
.map(volume -> {
CompletableFuture<Double> digItUpCF = CompletableFuture.supplyAsync(() -> digItUp(volume), diggingThreadPool);
return digItUpCF.thenApplyAsync(volumeDugUp -> fillItBackIn(volumeDugUp), fillingThreadPool);
})
.collect(Collectors.toList())
.forEach(cf -> {
Double volume = cf.join();
System.out.println("Dug a hole and filled it back in. Net volume: " + volume);
});
System.out.println("Dug up and filled back in " + holeVolumes.size() + " holes in " + (System.currentTimeMillis() - start) + " ms");
}
public Double digItUp(Double volume) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("Dug hole with volume " + volume);
return volume;
}
public Double fillItBackIn(Double volumeDugUp) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("Filled back in hole of volume " + volumeDugUp);
return 0.0;
}
}
I have a question about how Java Streams and chained CompletableFutures perform.
My question is this: if I run the following code, calling execute()
with 10 items in the list takes ~11 seconds to complete (number of items in the list plus 1). This is because I have two threads working in parallel: the first executes the digItUp
operation, and once that's complete, the second executes the fillItBackIn
operation, and the first starts processing digItUp
on the next item in the list.
If I comment out line 36 (.collect(Collectors.toList())
), the execute()
method takes ~20 seconds to complete. The threads do not operate in parallel; for each item in the list, the digItUp
operation completes, and then the fillItBackIn
operation completes in sequence before the next item in the list is processed.
It's unclear to me why the exclusion of (.collect(Collectors.toList())
) should change this behavior. Can someone explain?
The complete class:
package com.test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class SimpleExample {
private final ExecutorService diggingThreadPool = Executors.newFixedThreadPool(1);
private final ExecutorService fillingThreadPool = Executors.newFixedThreadPool(1);
public SimpleExample() {
}
public static void main(String[] args) {
List<Double> holesToDig = new ArrayList<>();
Random random = new Random();
for (int c = 0; c < 10; c++) {
holesToDig.add(random.nextDouble(1000));
}
new SimpleExample().execute(holesToDig);
}
public void execute(List<Double> holeVolumes) {
long start = System.currentTimeMillis();
holeVolumes.stream()
.map(volume -> {
CompletableFuture<Double> digItUpCF = CompletableFuture.supplyAsync(() -> digItUp(volume), diggingThreadPool);
return digItUpCF.thenApplyAsync(volumeDugUp -> fillItBackIn(volumeDugUp), fillingThreadPool);
})
.collect(Collectors.toList())
.forEach(cf -> {
Double volume = cf.join();
System.out.println("Dug a hole and filled it back in. Net volume: " + volume);
});
System.out.println("Dug up and filled back in " + holeVolumes.size() + " holes in " + (System.currentTimeMillis() - start) + " ms");
}
public Double digItUp(Double volume) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("Dug hole with volume " + volume);
return volume;
}
public Double fillItBackIn(Double volumeDugUp) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("Filled back in hole of volume " + volumeDugUp);
return 0.0;
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
原因是
collect(Collectors.toList())
是一个终端操作,因此它会触发流管道(请记住,流是延迟评估的)。因此,当您调用collect
时,所有CompletableFuture
实例都会被构造并放置在列表中。这意味着存在一条CompletableFuture
链,其中每一个又是由两个阶段组成的链,我们称它们为X和Y。每当第一个线程执行器完成一个X阶段时,它就是空闲地处理下一个组合的CompletableFuture的X阶段,而另一个线程执行器正在处理前一个CompletableFuture的Y阶段。这是我们直观地期望的结果。
另一方面,当您不调用
collect
时,在这种情况下forEach
就是终端操作。然而,在这种情况下,流中的每个元素都会按顺序处理(为了确认尝试切换到parallelStream()
),因此阶段 X 和 Y为第一个CompletableFuture
执行。只有当前一个流元素的阶段 Y 完成时,forEach
才会移动到流管道中的第二个元素,只有那时才会从原始流映射一个新的CompletableFuture
双
值。The reason is that
collect(Collectors.toList())
is a terminal operation, hence it triggers the stream pipeline (remember that streams are evaluated lazily). So when you callcollect
, all of theCompletableFuture
instances are constructed and placed in the list. This means that there is a chain ofCompletableFuture
, where each one is in turn a chain composed of two stages, let's call them X and Y.Every time the first thread executor finishes an X stage, it is free to process the X stage of the next composed
CompletableFuture
, while the other thread executor is processing stage Y of the previousCompletableFuture
. This is the result that we intuitively expect.On the other hand, when you don't call
collect
, thenforEach
is in this case the terminal operation. However, in this case every element in the stream is processed sequentially (to confirm try switching toparallelStream()
), hence stages X and Y get executed for the firstCompletableFuture
. Only when stage Y from the previous stream element is finished, willforEach
move to the second element in the stream pipeline, and only then will a newCompletableFuture
be mapped from the originalDouble
value.喜欢这个问题,M A 的回答很棒!我有一个类似的用例,我在那里使用 Rxjava。它的效果非常好,但我的同事向我提出挑战,要求我在没有它的情况下实施它。 TT
我测试了您的示例,并找到了一种解决方法,可以使其在没有
collect
的情况下具有相同的性能。诀窍是让 cf.join() 在另一个线程中执行。但我不得不说,这可能会导致潜在的问题,因为它缺乏对背压的支持......如果上游无限且快速,但消费者太慢,所有快速创建的
CompletableFuture
map
算子中的内容会被累加并提交到第一个diggingThreadPool,最终导致RejectedExecutionException、OOM等。Love this question and M A's answer is awesome! I had a similar use case, and I was using Rxjava there. It worked very well, but my colleagues challenged me to implement it without that. T.T
I tested your example and found a workaround to make it the same performance without
collect
. The trick is to let thecf.join()
be executed in another thread.But I have to say, this might lead to potential issues as it lacks the support for backpressure...if the upstream is infinite and fast, but the consumer is too slow, all the fast-created
CompletableFuture
in themap
operator would be accumulated and submitted to the first diggingThreadPool, finally causing RejectedExecutionException, OOM, etc.