Webflux 中,Reactor背压onBackpressureBuffery不起作用

发布于 2022-09-11 15:06:06 字数 3016 浏览 21 评论 0

最近在学习reactor,在学习背压时,看到有onBackpressureBuffer方法,但是配置了 buffer后好像不起效。代码如下,给定了一个生产速度远大于消费速度的情况,且buffer值较小,预期是抛出异常,但是却没有,请问是哪里的使用姿势不对吗?

    @Test
    public void test_onBackpressureBuffer() throws InterruptedException {

        Flux.interval(Duration.of(10, ChronoUnit.MILLIS))  //每毫秒产生10个数
            .onBackpressureBuffer(10)                              //设置buffer为10
            .subscribe(                                            //每秒消费一个数 
                    i -> {
                        System.out.println(Thread.currentThread().getName() + " "+i);
                        try {
                           Thread.sleep(1000);
                        } catch (Exception e) {
                            System.out.println(e.getMessage());
                        }
                    },
                    System.out::println
            );
        TimeUnit.MINUTES.sleep(10);
    }

类似的逻辑,使用RxJava2,则会正常爆出异常:

 Observable.interval(1, TimeUnit.MILLISECONDS)
                .toFlowable(BackpressureStrategy.BUFFER)
                .onBackpressureBuffer(256)
                .observeOn(Schedulers.newThread())
                .subscribe(
                        i -> {
                            System.out.println(i);
                            try {
                                Thread.sleep(100);
                            } catch (Exception e) {
                            }
                        });
        TimeUnit.SECONDS.sleep(1);

异常信息如下:

0
1
2
io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
    at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
    at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
    at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

慵挽 2022-09-18 15:06:07

是不是背压策略的问题

回压的处理有以下几种策略:

ERROR: 当下游跟不上节奏的时候发出一个错误信号。
DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
LATEST:让下游只得到上游最新的元素。
BUFFER:缓存下游没有来得及处理的元素(如果缓存不限大小的可能导致OutOfMemoryError)。
这几种策略定义在枚举类型OverflowStrategy中,不过还有一个IGNORE类型,即完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文