反应堆:延迟不适用
因此,我想使用Flux来运行每个单声道任务并等待每个任务2秒。这是我尝试过的,
val test = { x: Int ->
Mono.just(x)
.log()
.doOnNext { println("Number: $it") }
}
Flux.fromIterable(listOf(1, 2, 3))
.flatMap { number ->
test(number)
.delayElement(Duration.ofSeconds(2))
}
.collectList()
//.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
.block()
这是您在时间戳延迟上看到的结果
01:58:21.398 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
01:58:21.470 [Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.472 [Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
01:58:21.473 [Test worker] INFO reactor.Mono.Just.1 - | onNext(1)
Number: 1
01:58:21.477 [Test worker] INFO reactor.Mono.Just.1 - | onComplete()
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | request(unbounded)
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onNext(2)
Number: 2
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onComplete()
01:58:21.478 [Test worker] INFO reactor.Mono.Just.3 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | request(unbounded)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onNext(3)
Number: 3
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onComplete()
,即使它在同一线程(测试工作者)上运行,也似乎无法正常工作。我在这里做错了什么?
编辑:我不太了解它的工作原理,但是我没有在Flatmap的内部添加延迟内容,而是在其顶部添加延迟元素,并且可以正常工作。
Flux.fromIterable(listOf(1, 2, 3))
.delayElements(Duration.ofSeconds(2))
.flatMap { number ->
test(number)
}
.collectList()
//.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
.block()
结果
03:21:29.825 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
03:21:29.926 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
03:21:29.929 [Test worker] INFO reactor.Flux.Array.1 - | request(32)
03:21:29.930 [Test worker] INFO reactor.Flux.Array.1 - | onNext(1)
03:21:29.935 [Test worker] INFO reactor.Flux.Array.1 - | onNext(2)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onNext(3)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onComplete()
Thread[parallel-1,5,main] What is this 1 31
Thread[parallel-2,5,main] What is this 2 33
Thread[parallel-3,5,main] What is this 3 35
So I want to use Flux to run each Mono task and wait each task for 2 seconds. Here's what I have tried
val test = { x: Int ->
Mono.just(x)
.log()
.doOnNext { println("Number: $it") }
}
Flux.fromIterable(listOf(1, 2, 3))
.flatMap { number ->
test(number)
.delayElement(Duration.ofSeconds(2))
}
.collectList()
//.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
.block()
Here's the result
01:58:21.398 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
01:58:21.470 [Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.472 [Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
01:58:21.473 [Test worker] INFO reactor.Mono.Just.1 - | onNext(1)
Number: 1
01:58:21.477 [Test worker] INFO reactor.Mono.Just.1 - | onComplete()
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | request(unbounded)
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onNext(2)
Number: 2
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onComplete()
01:58:21.478 [Test worker] INFO reactor.Mono.Just.3 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | request(unbounded)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onNext(3)
Number: 3
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onComplete()
As you can see on the timestamp delayElement doesn't seem to work even though it runs on the same thread (Test worker). What I did wrong here?
Edited: I don't really understand how it works but instead of adding delayElement inside of flatmap I add delayElements on top of it and it works fine.
Flux.fromIterable(listOf(1, 2, 3))
.delayElements(Duration.ofSeconds(2))
.flatMap { number ->
test(number)
}
.collectList()
//.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
.block()
the result
03:21:29.825 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
03:21:29.926 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
03:21:29.929 [Test worker] INFO reactor.Flux.Array.1 - | request(32)
03:21:29.930 [Test worker] INFO reactor.Flux.Array.1 - | onNext(1)
03:21:29.935 [Test worker] INFO reactor.Flux.Array.1 - | onNext(2)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onNext(3)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onComplete()
Thread[parallel-1,5,main] What is this 1 31
Thread[parallel-2,5,main] What is this 2 33
Thread[parallel-3,5,main] What is this 3 35
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
问题在于,在第一个中,您正在执行Flatmap延迟,因为知道Flatmap是在Prefatch和并发256和32中,这意味着FlatMap中的代码将在Paralel中运行,并且所有项目将延迟2秒,但将执行。 (订阅)同时
如果您想将其放入其中,则需要与Prefatch 1使用ConcatMap 1,
这将使代码在内部等待,直到预览完成请求项目1 x 1,
但更干净的解决方案是在之后延迟它
The problem is that in the first one you are doing the delay in flatmap, Knowing that flatmap is with prefatch and concurrency 256 and 32 this means that the code in flatmap will run in paralel and all items will be delayed 2 seconds but will be executed (subscribed) in the same time
if you want to have it inside you need to use concatMap with prefatch 1
This will make the code inside wait till the preview finish requesting items 1 by 1
But more clean solution is to delay it after