Webflux 中,Reactor背压onBackpressureBuffery不起作用
最近在学习reactor,在学习背压时,看到有onBackpressureBuffer方法,但是配置了 buffer后好像不起效。代码如下,给定了一个生产速度远大于消费速度的情况,且buffer值较小,预期是抛出异常,但是却没有,请问是哪里的使用姿势不对吗?
@Test
public void test_onBackpressureBuffer() throws InterruptedException {
Flux.interval(Duration.of(10, ChronoUnit.MILLIS)) //每毫秒产生10个数
.onBackpressureBuffer(10) //设置buffer为10
.subscribe( //每秒消费一个数
i -> {
System.out.println(Thread.currentThread().getName() + " "+i);
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e.getMessage());
}
},
System.out::println
);
TimeUnit.MINUTES.sleep(10);
}
类似的逻辑,使用RxJava2,则会正常爆出异常:
Observable.interval(1, TimeUnit.MILLISECONDS)
.toFlowable(BackpressureStrategy.BUFFER)
.onBackpressureBuffer(256)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) {
}
});
TimeUnit.SECONDS.sleep(1);
异常信息如下:
0
1
2
io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
是不是背压策略的问题
回压的处理有以下几种策略:
ERROR: 当下游跟不上节奏的时候发出一个错误信号。
DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
LATEST:让下游只得到上游最新的元素。
BUFFER:缓存下游没有来得及处理的元素(如果缓存不限大小的可能导致OutOfMemoryError)。
这几种策略定义在枚举类型OverflowStrategy中,不过还有一个IGNORE类型,即完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException。