为什么反应堆不同时处理每个元素?
我期望的是,反应堆将为每个发射元素创建线程,这意味着元素1,2,2,3,4,5
应同时处理源。但从我的演示代码输出来看,这不是真的,但是我不知道为什么。有人可以看一看并向我解释两件事:
- 为什么我的演示代码中的反应堆处理同步时尚?
- 如何使反应堆如何处理每个元素并发?
尽管我以下演示代码中的反应堆链是与主线程异步的,但来自源flux
的每个元素都以同步的方式发射。
这是我的演示代码
System.out.println("main thread start ...");
Flux.range(1,5)
.flatMap(num->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.just(num);
}).flatMap(num-> Mono.just(num*10) )
.subscribeOn(Schedulers.boundedElastic())
.subscribe(res-> System.out.println("Thread name:" + Thread.currentThread().getName()+" value:" + res));
System.out.println("main thread sleep a little");
Thread.sleep(4000);
System.out.println("main thread end ...");
,这里是输出
Output:
main thread start ...
main thread sleep a little
0. element: 0
1. element: 1
main thread end ...
2. element: 2
What I expect is reactor will create threads for each emitted element, meaning elements1, 2, 3, 4, 5
from the source should be handled concurrently. But it's not true from my demo code output, but I don't know why. Could someone take a look and explain to me for two things:
- Why does reactor in my demo code handles elements in synchronize fashion?
- How to make reactor handles each element concurrent?
Though the reactor chain in my below demo code is async to the main thread, each element from source flux
emits in a synchronized way.
Here is my demo code
System.out.println("main thread start ...");
Flux.range(1,5)
.flatMap(num->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.just(num);
}).flatMap(num-> Mono.just(num*10) )
.subscribeOn(Schedulers.boundedElastic())
.subscribe(res-> System.out.println("Thread name:" + Thread.currentThread().getName()+" value:" + res));
System.out.println("main thread sleep a little");
Thread.sleep(4000);
System.out.println("main thread end ...");
Here is the output
Output:
main thread start ...
main thread sleep a little
0. element: 0
1. element: 1
main thread end ...
2. element: 2
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您的代码并非真正以反应性的方式实现,尤其是用于包装非反应性代码。 如果运行它,请考虑稍微返工的示例
,您会发现执行正在使用
queues.small_buffer_size
博客/2019/12/13/fiperflux-3-s-thopping-threads and-schedulers“ rel =“ nofollow noreferrer”> flux 3的飞行3-跳线和调度程序以获取更多详细信息解释和更多例子。
Your code is not really implemented in a reactive way especially for wrapping non-reactive code. Consider slightly reworked example
If you run it, you would see that execution is happening on multiple threads with concurrency of
Queues.SMALL_BUFFER_SIZE
Also, you could check Flight of the Flux 3 - Hopping Threads and Schedulers for more detailed explanation and more examples.