反应堆流 - 覆盖旧值,仅保留最新的
目前,我正在尝试使用项目弹簧反应堆实施标准的反应案例:生产者比消费者快。如果已经可用新价值,则消费者绝不应与旧值一起使用(例如:过时的股票价格不感兴趣)。
在我的代码示例中,我有一个生产商,每100ms 每100ms生成新值。但是消费者需要500ms 进行处理。由于在消费者中的处理之间已经出现了几个新值,因此只有消费者/订户的最新值对我而言是有趣的,而不是过时的中间值。
per limitrate(1)我试图一次向生产者要求一个值,而per onbackpressureLatest()我想忽略中间值。两者都没有根据需要工作。
正确的解决方案是什么?
@Test
void fluxTest(){
Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {
int i = ai.incrementAndGet();
if (i > 10) {
sink.complete();
} else {
System.out.println(Thread.currentThread()+": generate & emit value "+i);
sink.next(i);
}
sleep(100);
return ai;
});
flux
.publishOn(Schedulers.parallel())
.onBackpressureLatest()
.limitRate(1)
.subscribe(i -> {
System.out.println(Thread.currentThread()+": Receive: " + i); // do something with generated and processed item
sleep(500);
});
sleep(10000);
}
void sleep(int ms){
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
目前的结果:
14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 2
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 3
Thread[parallel-1,5,main]: Receive: 4
Thread[parallel-1,5,main]: Receive: 5
Thread[parallel-1,5,main]: Receive: 6
Thread[parallel-1,5,main]: Receive: 7
Thread[parallel-1,5,main]: Receive: 8
Thread[parallel-1,5,main]: Receive: 9
Thread[parallel-1,5,main]: Receive: 10
Process finished with exit code 0
我的预期结果是:
14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 5
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 10
Process finished with exit code 0
At the moment I am trying to implement a standard reactive case with Project Spring Reactor: The producer is faster than the consumer. The consumer should never work with old values if new ones are already available (example: outdated stock prices are not of interest).
In my code example I have a producer that generates a new value every 100ms. But the consumer needs 500ms for processing. Since between processing in the consumer already many several new values arise, only the newest values for the consumer/subscriber would be interesting for me and not the outdated intermediate values.
Per limitRate(1) I tried to request only one value at a time to the producer and per onBackPressureLatest() I wanted to ignore intermediate values. Both did not work as desired.
What would be the correct solution?
@Test
void fluxTest(){
Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {
int i = ai.incrementAndGet();
if (i > 10) {
sink.complete();
} else {
System.out.println(Thread.currentThread()+": generate & emit value "+i);
sink.next(i);
}
sleep(100);
return ai;
});
flux
.publishOn(Schedulers.parallel())
.onBackpressureLatest()
.limitRate(1)
.subscribe(i -> {
System.out.println(Thread.currentThread()+": Receive: " + i); // do something with generated and processed item
sleep(500);
});
sleep(10000);
}
void sleep(int ms){
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Current result:
14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 2
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 3
Thread[parallel-1,5,main]: Receive: 4
Thread[parallel-1,5,main]: Receive: 5
Thread[parallel-1,5,main]: Receive: 6
Thread[parallel-1,5,main]: Receive: 7
Thread[parallel-1,5,main]: Receive: 8
Thread[parallel-1,5,main]: Receive: 9
Thread[parallel-1,5,main]: Receive: 10
Process finished with exit code 0
My expected result would be:
14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 5
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 10
Process finished with exit code 0
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在
我发现了一种通过单声道中的磁通流中起作用的解决方案。
我的代码:
现在结果看起来如预期:
Under
Latest overflow strategy with size 1 or any alternatives
I have discovered a solution which works within the Flux Stream via Mono.just creation.
My code:
Now the results looks as expected: