为什么在磁通完成后,项目反应器停止运行,为什么不进行调整器。
我有一个字符串
s的原始flux
,并在main()
方法中运行此代码。
package com.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
import java.util.Arrays;
import java.util.List;
public class Parallel {
private static final Logger log = Loggers.getLogger(Parallel.class.getName());
private static List<String> COLORS = Arrays.asList("red", "white", "blue");
public static void main(String[] args) throws InterruptedException {
Flux<String> flux = Flux.fromIterable(COLORS);
flux
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub", 1))
.subscribe(value -> {
log.info("==============Consumed: " + value);
});
}
}
如果您尝试运行此代码,则应用程序将永远不会停止运行,您需要手动停止它。 如果我将.newparallel()
替换为.parallel()
一切都按预期工作,并且应用程序正常完成。
为什么它不能独自完成?为什么悬挂? 这种行为的原因是什么?
如果您以JUNIT测试运行此代码,则可以正常运行,并且不会悬挂。
I have a primitive Flux
of String
s, and run this code in the main()
method.
package com.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
import java.util.Arrays;
import java.util.List;
public class Parallel {
private static final Logger log = Loggers.getLogger(Parallel.class.getName());
private static List<String> COLORS = Arrays.asList("red", "white", "blue");
public static void main(String[] args) throws InterruptedException {
Flux<String> flux = Flux.fromIterable(COLORS);
flux
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub", 1))
.subscribe(value -> {
log.info("==============Consumed: " + value);
});
}
}
If you try to run this code the app never stops running and you need to stop it manually.
If I replace .newParallel()
with .parallel()
everything works as expected and the app finishes normally.
Why can't it finish running on its own? Why does it hang?
What is the reason for this behavior?
If you run this code as a JUnit test it works fine and it doesn't hang.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
调度程序
您使用Newxxx
Factory方法创建自己的实例是在 non-daemon 模式下创建的,这意味着它可以防止JVM从退出。junit调用
system.exit()
在所有测试运行时,这说明了为什么测试方案不悬挂的原因。在这种情况下,
schedulers.newsingle()
和schedulers.newparallel()
变体是最坏的“犯罪者”,因为在不活动超时之后,创建的线程不会被淘汰,与众不同。使用schedulers.newboundedelastic()
。如果在现实世界中您有一个定义明确的应用程序生命周期,则可以存储
调度程序
实例(例如,bean),并确保每个调度程序#dispose()
is在应用程序生命周期结束时打电话。更轻松的解决方案:使用相关的工厂过载,使用
daemon == True
明确创建调度程序
。Scheduler
instances that you create yourself withnewXxx
factory methods are created in non-daemon mode by default, which means that it can prevent the JVM from exiting.JUnit calls
System.exit()
when all the tests have run, which explains why the test scenario doesn't hang.In this context,
Schedulers.newSingle()
andSchedulers.newParallel()
variants are the worst "offenders", because the threads created are not culled after an inactivity timeout, unlike withSchedulers.newBoundedElastic()
.If in a real world scenario you have a well defined application lifecycle, you could store the
Scheduler
instances somewhere (eg. as beans) and ensure eachScheduler#dispose()
is called at the end of the application lifecycle.Easier solution: create the
Schedulers
explicitly withdaemon == true
using the relevant factory overload.我要做的就是将处置放在订阅的完整处理程序上。
What I would do is just put the dispose on the complete handler of the subscribe.