为什么反应堆不同时处理每个元素?

发布于 2025-01-22 07:27:49 字数 1307 浏览 2 评论 0原文

我期望的是,反应堆将为每个发射元素创建线程,这意味着元素1,2,2,3,4,5应同时处理源。但从我的演示代码输出来看,这不是真的,但是我不知道为什么。有人可以看一看并向我解释两件事:

  1. 为什么我的演示代码中的反应堆处理同步时尚?
  2. 如何使反应堆如何处理每个元素并发

尽管我以下演示代码中的反应堆链是与主线程异步的,但来自源flux的每个元素都以同步的方式发射。

这是我的演示代码

        System.out.println("main thread start ...");

        Flux.range(1,5)
                .flatMap(num->{
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return Mono.just(num);
                }).flatMap(num-> Mono.just(num*10) )
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(res-> System.out.println("Thread name:" + Thread.currentThread().getName()+" value:" + res));


        System.out.println("main thread sleep a little");
        Thread.sleep(4000);
        System.out.println("main thread end ...");

,这里是输出

        Output:
            main thread start ...
            main thread sleep a little
            0. element: 0
            1. element: 1
            main thread end ...
            2. element: 2

What I expect is reactor will create threads for each emitted element, meaning elements1, 2, 3, 4, 5 from the source should be handled concurrently. But it's not true from my demo code output, but I don't know why. Could someone take a look and explain to me for two things:

  1. Why does reactor in my demo code handles elements in synchronize fashion?
  2. How to make reactor handles each element concurrent?

Though the reactor chain in my below demo code is async to the main thread, each element from source flux emits in a synchronized way.

Here is my demo code

        System.out.println("main thread start ...");

        Flux.range(1,5)
                .flatMap(num->{
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return Mono.just(num);
                }).flatMap(num-> Mono.just(num*10) )
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(res-> System.out.println("Thread name:" + Thread.currentThread().getName()+" value:" + res));


        System.out.println("main thread sleep a little");
        Thread.sleep(4000);
        System.out.println("main thread end ...");

Here is the output

        Output:
            main thread start ...
            main thread sleep a little
            0. element: 0
            1. element: 1
            main thread end ...
            2. element: 2

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

你在看孤独的风景 2025-01-29 07:27:49

您的代码并非真正以反应性的方式实现,尤其是用于包装非反应性代码。 如果运行它,请考虑稍微返工的示例

@Test
void concurrent() {
    Flux<Integer> stream = Flux.range(1, 50)
            .flatMap(this::process)
            .flatMap(num -> Mono.just(num * 10));

    StepVerifier.create(stream)
            .expectNextCount(50)
            .verifyComplete();
}

private Mono<Integer> process(int num) {
    return Mono.fromCallable(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return num;
    })
    .log()
    .subscribeOn(Schedulers.boundedElastic());
}

,您会发现执行正在使用queues.small_buffer_size

22:54:26.066  [boundedElastic-50] INFO [r.M.L.50] - | request(32)
22:54:27.038  [boundedElastic-6] INFO [r.M.L.6] - | onNext(6)
22:54:27.038  [boundedElastic-11] INFO [r.M.L.11] - | onNext(11)
22:54:27.038  [boundedElastic-2] INFO [r.M.L.2] - | onNext(2)
22:54:27.038  [boundedElastic-9] INFO [r.M.L.9] - | onNext(9)
22:54:27.038  [boundedElastic-3] INFO [r.M.L.3] - | onNext(3)
22:54:27.040  [boundedElastic-13] INFO [r.M.L.13] - | onNext(13)
22:54:27.040  [boundedElastic-7] INFO [r.M.L.7] - | onNext(7)
22:54:27.040  [boundedElastic-20] INFO [r.M.L.20] - | onNext(20)
22:54:27.041  [boundedElastic-6] INFO [r.M.L.6] - | onComplete()
22:54:27.041  [boundedElastic-13] INFO [r.M.L.13] - | onComplete()
22:54:27.043  [boundedElastic-2] INFO [r.M.L.2] - | onComplete()
22:54:27.043  [boundedElastic-3] INFO [r.M.L.3] - | onComplete()
22:54:27.043  [boundedElastic-20] INFO [r.M.L.20] - | onComplete()
22:54:27.043  [boundedElastic-11] INFO [r.M.L.11] - | onComplete()
22:54:27.043  [boundedElastic-7] INFO [r.M.L.7] - | onComplete()
22:54:27.044  [boundedElastic-9] INFO [r.M.L.9] - | onComplete()
22:54:27.045  [boundedElastic-1] INFO [r.M.L.1] - | onNext(1)
22:54:27.045  [boundedElastic-5] INFO [r.M.L.5] - | onNext(5)
22:54:27.045  [boundedElastic-15] INFO [r.M.L.15] - | onNext(15)
22:54:27.045  [boundedElastic-1] INFO [r.M.L.1] - | onComplete()
22:54:27.045  [boundedElastic-15] INFO [r.M.L.15] - | onComplete()

博客/2019/12/13/fiperflux-3-s-thopping-threads and-schedulers“ rel =“ nofollow noreferrer”> flux 3的飞行3-跳线和调度程序以获取更多详细信息解释和更多例子。

Your code is not really implemented in a reactive way especially for wrapping non-reactive code. Consider slightly reworked example

@Test
void concurrent() {
    Flux<Integer> stream = Flux.range(1, 50)
            .flatMap(this::process)
            .flatMap(num -> Mono.just(num * 10));

    StepVerifier.create(stream)
            .expectNextCount(50)
            .verifyComplete();
}

private Mono<Integer> process(int num) {
    return Mono.fromCallable(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return num;
    })
    .log()
    .subscribeOn(Schedulers.boundedElastic());
}

If you run it, you would see that execution is happening on multiple threads with concurrency of Queues.SMALL_BUFFER_SIZE

22:54:26.066  [boundedElastic-50] INFO [r.M.L.50] - | request(32)
22:54:27.038  [boundedElastic-6] INFO [r.M.L.6] - | onNext(6)
22:54:27.038  [boundedElastic-11] INFO [r.M.L.11] - | onNext(11)
22:54:27.038  [boundedElastic-2] INFO [r.M.L.2] - | onNext(2)
22:54:27.038  [boundedElastic-9] INFO [r.M.L.9] - | onNext(9)
22:54:27.038  [boundedElastic-3] INFO [r.M.L.3] - | onNext(3)
22:54:27.040  [boundedElastic-13] INFO [r.M.L.13] - | onNext(13)
22:54:27.040  [boundedElastic-7] INFO [r.M.L.7] - | onNext(7)
22:54:27.040  [boundedElastic-20] INFO [r.M.L.20] - | onNext(20)
22:54:27.041  [boundedElastic-6] INFO [r.M.L.6] - | onComplete()
22:54:27.041  [boundedElastic-13] INFO [r.M.L.13] - | onComplete()
22:54:27.043  [boundedElastic-2] INFO [r.M.L.2] - | onComplete()
22:54:27.043  [boundedElastic-3] INFO [r.M.L.3] - | onComplete()
22:54:27.043  [boundedElastic-20] INFO [r.M.L.20] - | onComplete()
22:54:27.043  [boundedElastic-11] INFO [r.M.L.11] - | onComplete()
22:54:27.043  [boundedElastic-7] INFO [r.M.L.7] - | onComplete()
22:54:27.044  [boundedElastic-9] INFO [r.M.L.9] - | onComplete()
22:54:27.045  [boundedElastic-1] INFO [r.M.L.1] - | onNext(1)
22:54:27.045  [boundedElastic-5] INFO [r.M.L.5] - | onNext(5)
22:54:27.045  [boundedElastic-15] INFO [r.M.L.15] - | onNext(15)
22:54:27.045  [boundedElastic-1] INFO [r.M.L.1] - | onComplete()
22:54:27.045  [boundedElastic-15] INFO [r.M.L.15] - | onComplete()

Also, you could check Flight of the Flux 3 - Hopping Threads and Schedulers for more detailed explanation and more examples.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文