递归实现 CompletableFuture

发布于 2025-01-14 04:00:38 字数 1132 浏览 4 评论 0原文

我有三个任务:

createSimulationScenarios()runSimulation()getSimulationOutput()

这些任务以迭代方式完成:

  1. createSimulationScenarios() 创建新的模拟文件
  2. runSimulation() 运行模拟。如果有很多模拟,可以并行批量执行。
  3. 模拟完成后,getSimulationOutput() 就会立即处理输出数据。只要特定的模拟已经完成,也可以并行进行。

然后,当所有模拟完成并获得所有输出时,createSimulationScenarios() 将根据前一批的输出创建一组新的模拟文件。依此类推...

因为 each runSimulation()each getSimulationOutput() 可以并行完成,我想到使用CompletableFuture来实现。

for (i = 0; i < noSimulations; ++i) {
    CompletableFuture<Void> runSim = CompletableFuture.runAsync(new runSimulation(), simulationExecutor);
    CompletableFuture<Void> getOutput = runSim.thenCompose(x ->
                                        CompletableFuture.runAsync(new getSimulationOutput(), outputExecutor));
}

但是,我陷入了如何将此块连接到另一个用于 createSimulationScenarios()CompletableFuture 的问题,并要求它仅在所有模拟和输出完成时才开始执行(其中是下一次迭代的开始)。

有什么想法吗?

I have three Tasks:

createSimulationScenarios(), runSimulation(), and getSimulationOutput()

These tasks are done in an iterative manner:

  1. createSimulationScenarios() creates new simulation files
  2. runSimulation() runs a simulation. If there are many simulations they can be executed in parallel batches.
  3. getSimulationOutput() crunches the output data as soon as a simulation is done. Can also be done in parallel as long as the specific simulation is already finished.

Then when all simulations are finished and all outputs are obtained, createSimulationScenarios() creates a new set of simulation files based on the previous batch's output. And so on...

Because each runSimulation() and each getSimulationOutput() can be done in parallel, I thought of using CompletableFuture for the implementation.

for (i = 0; i < noSimulations; ++i) {
    CompletableFuture<Void> runSim = CompletableFuture.runAsync(new runSimulation(), simulationExecutor);
    CompletableFuture<Void> getOutput = runSim.thenCompose(x ->
                                        CompletableFuture.runAsync(new getSimulationOutput(), outputExecutor));
}

However, I am stuck on how to connect this chunk to another CompletableFuture for createSimulationScenarios() with the requirement that it only starts to execute when all simulations and outputs have finished (which is the start of the next iteration).

Any thoughts?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

善良天后 2025-01-21 04:00:38

看看 ForkJoinTask,它似乎非常适合计算量大的任务,可以在不同的环境中以最佳方式运行

RecursiveTask 参考显示了一个简单的示例执行

Have a look at ForkJoinTask, it seems to fit perfectly for the computation-heavy tasks which scale to run optimally in different environments

RecursiveTask reference shows the example of a simple implementation

清风不识月 2025-01-21 04:00:38

您可以坚持使用迭代,而不是使用递归,但似乎无限循环正是您正在寻找的。此示例使用无限流。 CompletableFuture 调用可能需要根据所需的确切行为进行更改(翻译:我怀疑它会编译,只是显示代码可能是什么样子)。

Stream.generate(() -> createSimulationScenarios())
    .map(simulations -> IntStream.range(0, simulations.size())
       .mapToObj(i -> CompletableFuture.runAsync(new runSimulation(), simulationExecutor)
          .thenCompose(x -> CompletableFuture.runAsync(new getSimulationOutput(), 
                            outputExecutor)))
       .collect(Collectors.toList())) // force tasks to be submitted
    .forEach(futureList -> futureList.stream().forEach(CompletableFuture::join));

Instead of using recursion, you can stick with iteration, but it seems an infinite loop is what you're looking for. This example uses an infinite stream. The CompletableFuture calls will probably need to be changed according to the exact behavior desired (translation: I doubt it will compile, just showing what the code could look like).

Stream.generate(() -> createSimulationScenarios())
    .map(simulations -> IntStream.range(0, simulations.size())
       .mapToObj(i -> CompletableFuture.runAsync(new runSimulation(), simulationExecutor)
          .thenCompose(x -> CompletableFuture.runAsync(new getSimulationOutput(), 
                            outputExecutor)))
       .collect(Collectors.toList())) // force tasks to be submitted
    .forEach(futureList -> futureList.stream().forEach(CompletableFuture::join));
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文