反应堆项目中的条件平行性

发布于 2025-02-03 00:26:47 字数 419 浏览 4 评论 0 原文

在某些流中,必须基于特定字段并平行解析消息。

使用 GroupBy 不是一个选项,因为它会收集大量结果然后对它们进行分组(它引入了延迟)。 另外,如果使用 subscribeon(Schedulers.Single()),则结果还可以,但它引入了饥饿问题。

作为示例订单,应顺序执行具有相同 batchid 的batchid 。因此,订单1和2应顺序处理,并且可以平行执行订单3。

    record Order(Integer id,Integer batchId){}
    void testParallel() {
        Flux.just(new Order(1,1),new Order(2,1),new Order(3,2));
    }

There is a stream in which messages must be parsed parallel based on a specific field.

Using groupBy isn't an option, because it collects a chunk of results and then groups them(it introduces delay).
Also, if subscribeOn(Schedulers.single()) is used, the result is okay, but it introduces a starvation problem.

As an example orders with the same batchId should be executed sequentially. Thus, orders 1 and 2 should be processed sequentially, and order 3 can be executed parallel.

    record Order(Integer id,Integer batchId){}
    void testParallel() {
        Flux.just(new Order(1,1),new Order(2,1),new Order(3,2));
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

以酷 2025-02-10 00:26:47

您应该能够创建2个单独的 flux 实例。一个处理每个发射的项目顺序的,另一个处理它们并并行处理它们。然后,您可以合并将它们纳入单个 flux

按照上面示例的行

        Flux.just(1, 2, 3, 4, 5, 6)
                .collectList()
                .flatMapMany(list -> {
                    Stream<Integer> sequential = list.stream().filter(i -> i < 4);
                    Stream<Integer> parallel = list.stream().filter(i -> i > 3);
                    Flux<Integer> sequentialFlux = Flux.fromStream(sequential).concatMap(i -> /** do your work **/);
                    Flux<Integer> parallelFlux = Flux.fromStream(parallel).flatMap(i -> /** do your work **/);

                    return Flux.merge(sequentialFlux, parallelFlux);
                }).log().subscribe();

,元素 1、2、3 将被顺序处理,而元素 3、4、5 将并行处理。

Note

您尚未清楚需要应用哪种条件逻辑,因此我现在只应用了一些虚拟逻辑。

另外, collectList()仅应与有限流一起使用。

- 此操作员等待一个内部完成,然后再生成下一个并订阅它。

merge docs - Unlike concat, sources are subscribed to eagerly< /代码>

You should be able to create 2 separate Flux instances. One that process each emitted item sequential and another that process them in parallel. You can then merge them into a single Flux.

Something along the lines of

        Flux.just(1, 2, 3, 4, 5, 6)
                .collectList()
                .flatMapMany(list -> {
                    Stream<Integer> sequential = list.stream().filter(i -> i < 4);
                    Stream<Integer> parallel = list.stream().filter(i -> i > 3);
                    Flux<Integer> sequentialFlux = Flux.fromStream(sequential).concatMap(i -> /** do your work **/);
                    Flux<Integer> parallelFlux = Flux.fromStream(parallel).flatMap(i -> /** do your work **/);

                    return Flux.merge(sequentialFlux, parallelFlux);
                }).log().subscribe();

In the example above, elements 1, 2, 3 will be processed sequentially whereas elements 3, 4, 5 will processed in parallel.

Note

You did not make it clear what conditional logic needs to be applied so I have just apply some dummy logic for now.

Also, collectList() should only be used with finite streams.

concatMap docs - this operator waits for one inner to complete before generating the next one and subscribing to it.

merge docs - Unlike concat, sources are subscribed to eagerly

为你拒绝所有暧昧 2025-02-10 00:26:47

需求有点模糊,但似乎 BufferuntilChanged 可以解决问题。在您的示例中,它将收集一批的连续元素,然后发出它们。

flux.bufferUntilChanged(Order::getBatchId)

这将返回A flux&lt; list&lt; order&gt;&gt; 。您可以并行处理列表。

WindownuntilChanged 也可以是一个选项。它将返回 flux&lt; lux&lt; order&gt;&gt;

The requirements are a bit vague but it seems like bufferUntilChanged could do the trick. In your example, it would collect successive elements of one batch and then emit them.

flux.bufferUntilChanged(Order::getBatchId)

This returns a Flux<List<Order>>. You can process the lists in parallel.

windowUntilChanged could also be an option. It would return a Flux<Flux<Order>>.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文