反应堆流 - 覆盖旧值,仅保留最新的

发布于 2025-02-06 19:58:11 字数 3076 浏览 2 评论 0原文

目前,我正在尝试使用项目弹簧反应堆实施标准的反应案例:生产者比消费者快。如果已经可用新价值,则消费者绝不应与旧值一起使用(例如:过时的股票价格不感兴趣)。

在我的代码示例中,我有一个生产商,每100ms 每100ms生成新值。但是消费者需要500ms 进行处理。由于在消费者中的处理之间已经出现了几个新值,因此只有消费者/订户的最新值对我而言是有趣的,而不是过时的中间值。

per limitrate(1)我试图一次向生产者要求一个值,而per onbackpressureLatest()我想忽略中间值。两者都没有根据需要工作。

正确的解决方案是什么?

@Test
void fluxTest(){
    
    Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {

        int i = ai.incrementAndGet();

        if (i > 10) {
            sink.complete();
        } else {
            System.out.println(Thread.currentThread()+": generate & emit value "+i);
            sink.next(i);
        }
        sleep(100);
        return ai;
    });

    flux
            .publishOn(Schedulers.parallel())
            .onBackpressureLatest()
            .limitRate(1)
            .subscribe(i -> {
                System.out.println(Thread.currentThread()+": Receive: " + i); // do something with generated and processed item
                sleep(500);
            });

    sleep(10000);
}

void sleep(int ms){
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

目前的结果:

14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 2
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 3
Thread[parallel-1,5,main]: Receive: 4
Thread[parallel-1,5,main]: Receive: 5
Thread[parallel-1,5,main]: Receive: 6
Thread[parallel-1,5,main]: Receive: 7
Thread[parallel-1,5,main]: Receive: 8
Thread[parallel-1,5,main]: Receive: 9
Thread[parallel-1,5,main]: Receive: 10

Process finished with exit code 0

我的预期结果是:

14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 5
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 10

Process finished with exit code 0

At the moment I am trying to implement a standard reactive case with Project Spring Reactor: The producer is faster than the consumer. The consumer should never work with old values if new ones are already available (example: outdated stock prices are not of interest).

In my code example I have a producer that generates a new value every 100ms. But the consumer needs 500ms for processing. Since between processing in the consumer already many several new values arise, only the newest values for the consumer/subscriber would be interesting for me and not the outdated intermediate values.

Per limitRate(1) I tried to request only one value at a time to the producer and per onBackPressureLatest() I wanted to ignore intermediate values. Both did not work as desired.

What would be the correct solution?

@Test
void fluxTest(){
    
    Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {

        int i = ai.incrementAndGet();

        if (i > 10) {
            sink.complete();
        } else {
            System.out.println(Thread.currentThread()+": generate & emit value "+i);
            sink.next(i);
        }
        sleep(100);
        return ai;
    });

    flux
            .publishOn(Schedulers.parallel())
            .onBackpressureLatest()
            .limitRate(1)
            .subscribe(i -> {
                System.out.println(Thread.currentThread()+": Receive: " + i); // do something with generated and processed item
                sleep(500);
            });

    sleep(10000);
}

void sleep(int ms){
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Current result:

14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 2
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 3
Thread[parallel-1,5,main]: Receive: 4
Thread[parallel-1,5,main]: Receive: 5
Thread[parallel-1,5,main]: Receive: 6
Thread[parallel-1,5,main]: Receive: 7
Thread[parallel-1,5,main]: Receive: 8
Thread[parallel-1,5,main]: Receive: 9
Thread[parallel-1,5,main]: Receive: 10

Process finished with exit code 0

My expected result would be:

14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 5
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 10

Process finished with exit code 0

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

辞慾 2025-02-13 19:58:11

我发现了一种通过单声道中的磁通流中起作用的解决方案。

我的代码:

Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {

    int i = ai.incrementAndGet();

    if (i > 10) {
        sink.complete();
    } else {
        System.out.println(Thread.currentThread()+": generate & emit value "+i);
        sink.next(i);
    }
    sleep(100);
    return ai;
});

Disposable subscribe = flux
        .publishOn(Schedulers.parallel())
        .onBackpressureLatest()
        .flatMap(next ->
                Mono.just(next)
                        .subscribeOn(Schedulers.single()), 1, 1)
        .subscribe(i -> {
            System.out.println(Thread.currentThread() + ": Receive: " + i); // do something with generated and processed item
            sleep(500);
        });

while(!subscribe.isDisposed());

现在结果看起来如预期:

22:51:42.624 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[single-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[main,5,main]: generate & emit value 6
Thread[single-1,5,main]: Receive: 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[single-1,5,main]: Receive: 10

Process finished with exit code 0

Under

Latest overflow strategy with size 1 or any alternatives

I have discovered a solution which works within the Flux Stream via Mono.just creation.

My code:

Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {

    int i = ai.incrementAndGet();

    if (i > 10) {
        sink.complete();
    } else {
        System.out.println(Thread.currentThread()+": generate & emit value "+i);
        sink.next(i);
    }
    sleep(100);
    return ai;
});

Disposable subscribe = flux
        .publishOn(Schedulers.parallel())
        .onBackpressureLatest()
        .flatMap(next ->
                Mono.just(next)
                        .subscribeOn(Schedulers.single()), 1, 1)
        .subscribe(i -> {
            System.out.println(Thread.currentThread() + ": Receive: " + i); // do something with generated and processed item
            sleep(500);
        });

while(!subscribe.isDisposed());

Now the results looks as expected:

22:51:42.624 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[single-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[main,5,main]: generate & emit value 6
Thread[single-1,5,main]: Receive: 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[single-1,5,main]: Receive: 10

Process finished with exit code 0
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文