反应堆:延迟不适用

发布于 2025-01-24 13:04:06 字数 2750 浏览 2 评论 0原文

因此,我想使用Flux来运行每个单声道任务并等待每个任务2秒。这是我尝试过的,

val test = { x: Int ->
      Mono.just(x)
        .log()
        .doOnNext { println("Number: $it") }
    }


Flux.fromIterable(listOf(1, 2, 3))
    .flatMap { number ->
      test(number)
         .delayElement(Duration.ofSeconds(2))
    }
    .collectList()
    //.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
   .block()

这是您在时间戳延迟上看到的结果

01:58:21.398 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
01:58:21.470 [Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.472 [Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
01:58:21.473 [Test worker] INFO reactor.Mono.Just.1 - | onNext(1)
Number: 1
01:58:21.477 [Test worker] INFO reactor.Mono.Just.1 - | onComplete()
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | request(unbounded)
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onNext(2)
Number: 2
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onComplete()
01:58:21.478 [Test worker] INFO reactor.Mono.Just.3 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | request(unbounded)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onNext(3)
Number: 3
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onComplete()

,即使它在同一线程(测试工作者)上运行,也似乎无法正常工作。我在这里做错了什么?

编辑:我不太了解它的工作原理,但是我没有在Flatmap的内部添加延迟内容,而是在其顶部添加延迟元素,并且可以正常工作。

Flux.fromIterable(listOf(1, 2, 3))
    .delayElements(Duration.ofSeconds(2))
    .flatMap { number ->
      test(number)
    }
    .collectList()
    //.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
   .block()

结果

03:21:29.825 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
03:21:29.926 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
03:21:29.929 [Test worker] INFO reactor.Flux.Array.1 - | request(32)
03:21:29.930 [Test worker] INFO reactor.Flux.Array.1 - | onNext(1)
03:21:29.935 [Test worker] INFO reactor.Flux.Array.1 - | onNext(2)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onNext(3)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onComplete()
Thread[parallel-1,5,main] What is this 1 31
Thread[parallel-2,5,main] What is this 2 33
Thread[parallel-3,5,main] What is this 3 35

So I want to use Flux to run each Mono task and wait each task for 2 seconds. Here's what I have tried

val test = { x: Int ->
      Mono.just(x)
        .log()
        .doOnNext { println("Number: $it") }
    }


Flux.fromIterable(listOf(1, 2, 3))
    .flatMap { number ->
      test(number)
         .delayElement(Duration.ofSeconds(2))
    }
    .collectList()
    //.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
   .block()

Here's the result

01:58:21.398 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
01:58:21.470 [Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.472 [Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
01:58:21.473 [Test worker] INFO reactor.Mono.Just.1 - | onNext(1)
Number: 1
01:58:21.477 [Test worker] INFO reactor.Mono.Just.1 - | onComplete()
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | request(unbounded)
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onNext(2)
Number: 2
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onComplete()
01:58:21.478 [Test worker] INFO reactor.Mono.Just.3 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | request(unbounded)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onNext(3)
Number: 3
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onComplete()

As you can see on the timestamp delayElement doesn't seem to work even though it runs on the same thread (Test worker). What I did wrong here?

Edited: I don't really understand how it works but instead of adding delayElement inside of flatmap I add delayElements on top of it and it works fine.

Flux.fromIterable(listOf(1, 2, 3))
    .delayElements(Duration.ofSeconds(2))
    .flatMap { number ->
      test(number)
    }
    .collectList()
    //.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
   .block()

the result

03:21:29.825 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
03:21:29.926 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
03:21:29.929 [Test worker] INFO reactor.Flux.Array.1 - | request(32)
03:21:29.930 [Test worker] INFO reactor.Flux.Array.1 - | onNext(1)
03:21:29.935 [Test worker] INFO reactor.Flux.Array.1 - | onNext(2)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onNext(3)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onComplete()
Thread[parallel-1,5,main] What is this 1 31
Thread[parallel-2,5,main] What is this 2 33
Thread[parallel-3,5,main] What is this 3 35

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

探春 2025-01-31 13:04:06

问题在于,在第一个中,您正在执行Flatmap延迟,因为知道Flatmap是在Prefatch和并发256和32中,这意味着FlatMap中的代码将在Paralel中运行,并且所有项目将延迟2秒,但将执行。 (订阅)同时
如果您想将其放入其中,则需要与Prefatch 1使用ConcatMap 1,

Flux.fromIterable(listOf(1, 2, 3))
.concatmap ( number ->
  test(number)
     .delayElement(Duration.ofSeconds(2))
,1)
.collectList()
.block()

这将使代码在内部等待,直到预览完成请求项目1 x 1,

但更干净的解决方案是在之后延迟它

The problem is that in the first one you are doing the delay in flatmap, Knowing that flatmap is with prefatch and concurrency 256 and 32 this means that the code in flatmap will run in paralel and all items will be delayed 2 seconds but will be executed (subscribed) in the same time
if you want to have it inside you need to use concatMap with prefatch 1

Flux.fromIterable(listOf(1, 2, 3))
.concatmap ( number ->
  test(number)
     .delayElement(Duration.ofSeconds(2))
,1)
.collectList()
.block()

This will make the code inside wait till the preview finish requesting items 1 by 1

But more clean solution is to delay it after

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文