为什么在磁通完成后,项目反应器停止运行,为什么不进行调整器。

发布于 2025-02-05 06:55:20 字数 1147 浏览 4 评论 0原文

我有一个字符串 s的原始flux,并在main()方法中运行此代码。

package com.example;
    
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
    
import java.util.Arrays;
import java.util.List;
    
public class Parallel {
    
  private static final Logger log = Loggers.getLogger(Parallel.class.getName());

  private static List<String> COLORS = Arrays.asList("red", "white", "blue");

  public static void main(String[] args) throws InterruptedException {
    Flux<String> flux = Flux.fromIterable(COLORS);
    flux
      .log()
      .map(String::toUpperCase)
      .subscribeOn(Schedulers.newParallel("sub"))
      .publishOn(Schedulers.newParallel("pub", 1))
      .subscribe(value -> {
        log.info("==============Consumed: " + value);
      });
  }
}

如果您尝试运行此代码,则应用程序将永远不会停止运行,您需要手动停止它。 如果我将.newparallel()替换为.parallel()一切都按预期工作,并且应用程序正常完成。

为什么它不能独自完成?为什么悬挂? 这种行为的原因是什么?

如果您以JUNIT测试运行此代码,则可以正常运行,并且不会悬挂。

I have a primitive Flux of Strings, and run this code in the main() method.

package com.example;
    
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
    
import java.util.Arrays;
import java.util.List;
    
public class Parallel {
    
  private static final Logger log = Loggers.getLogger(Parallel.class.getName());

  private static List<String> COLORS = Arrays.asList("red", "white", "blue");

  public static void main(String[] args) throws InterruptedException {
    Flux<String> flux = Flux.fromIterable(COLORS);
    flux
      .log()
      .map(String::toUpperCase)
      .subscribeOn(Schedulers.newParallel("sub"))
      .publishOn(Schedulers.newParallel("pub", 1))
      .subscribe(value -> {
        log.info("==============Consumed: " + value);
      });
  }
}

If you try to run this code the app never stops running and you need to stop it manually.
If I replace .newParallel() with .parallel() everything works as expected and the app finishes normally.

Why can't it finish running on its own? Why does it hang?
What is the reason for this behavior?

If you run this code as a JUnit test it works fine and it doesn't hang.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

梅窗月明清似水 2025-02-12 06:55:20

调度程序您使用Newxxx Factory方法创建自己的实例是在 non-daemon 模式下创建的,这意味着它可以防止JVM从退出。

junit调用system.exit()在所有测试运行时,这说明了为什么测试方案不悬挂的原因。

在这种情况下,schedulers.newsingle()schedulers.newparallel()变体是最坏的“犯罪者”,因为在不活动超时之后,创建的线程不会被淘汰,与众不同。使用schedulers.newboundedelastic()

如果在现实世界中您有一个定义明确的应用程序生命周期,则可以存储调度程序实例(例如,bean),并确保每个调度程序#dispose() is在应用程序生命周期结束时打电话。

更轻松的解决方案:使用相关的工厂过载,使用daemon == True明确创建调度程序

Scheduler instances that you create yourself with newXxx factory methods are created in non-daemon mode by default, which means that it can prevent the JVM from exiting.

JUnit calls System.exit() when all the tests have run, which explains why the test scenario doesn't hang.

In this context, Schedulers.newSingle() and Schedulers.newParallel() variants are the worst "offenders", because the threads created are not culled after an inactivity timeout, unlike with Schedulers.newBoundedElastic().

If in a real world scenario you have a well defined application lifecycle, you could store the Scheduler instances somewhere (eg. as beans) and ensure each Scheduler#dispose() is called at the end of the application lifecycle.

Easier solution: create the Schedulers explicitly with daemon == true using the relevant factory overload.

誰ツ都不明白 2025-02-12 06:55:20

我要做的就是将处置放在订阅的完整处理程序上。

package example

import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import reactor.util.Loggers
import java.util.*

object Parallel

private val log = Loggers.getLogger(Parallel::class.java.name)

private val COLORS = listOf("red", "white", "blue")

fun main() {
    val flux = Flux.fromIterable(COLORS)
    val subScheduler = Schedulers.newParallel("sub")
    val pubScheduler = Schedulers.newParallel("pub", 1)
    flux
        .log()
        .map { obj: String ->
            obj.uppercase(
                Locale.getDefault()
            )
        }
        .subscribeOn(subScheduler)
        .publishOn(pubScheduler)
        .subscribe(
            { value ->
                log.info("==============Consumed: $value")
            },
            { err -> log.error("{}", err.message) },
            {
                subScheduler.dispose()
                pubScheduler.dispose()
            }
        )

}

What I would do is just put the dispose on the complete handler of the subscribe.

package example

import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import reactor.util.Loggers
import java.util.*

object Parallel

private val log = Loggers.getLogger(Parallel::class.java.name)

private val COLORS = listOf("red", "white", "blue")

fun main() {
    val flux = Flux.fromIterable(COLORS)
    val subScheduler = Schedulers.newParallel("sub")
    val pubScheduler = Schedulers.newParallel("pub", 1)
    flux
        .log()
        .map { obj: String ->
            obj.uppercase(
                Locale.getDefault()
            )
        }
        .subscribeOn(subScheduler)
        .publishOn(pubScheduler)
        .subscribe(
            { value ->
                log.info("==============Consumed: $value")
            },
            { err -> log.error("{}", err.message) },
            {
                subScheduler.dispose()
                pubScheduler.dispose()
            }
        )

}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文