Springboot Coroutine Bean范围或本地范围

发布于 2025-01-21 21:01:52 字数 2192 浏览 0 评论 0 原文

我有一个要求,我们希望通过Coroutine不同步处理一些上游请求/有效载荷。我看到有几种方法可以做到这一点,但是想知道哪种方法是正确的方法 -

  1. 提供明确的春季服务类,以实现CoroutinesCope
  2. Autowire Singleton范围示波器范围 - 由某些定义的线程池调度程序支持。
  3. 定义方法本地 coroutinescope

在此

fun testSuspensions(count: Int) {
  val launchTime = measureTimeMillis {
    val parentJob = CoroutineScope(Dispatchers.IO).launch {
      repeat(count) {
        this.launch {
          process()  //Some lone running process
        }
      }
    }
  }
}

局部范围,那么折衷的

@KafkaListener(
    topics = ["test_topic"],
    concurrency = "1",
    containerFactory = "someListenerContainerConfig"
  )
  private fun testKafkaListener(consumerRecord: ConsumerRecord<String, ByteArray>, ack: Acknowledgment) {
    try {
      this.coroutineScope.launch {
        consumeRecordAsync(consumerRecord)
      }
    } finally {
      ack.acknowledge()
    }
  }

  suspend fun consumeRecordAsync(record: ConsumerRecord<String, ByteArray>) {
    println("[${Thread.currentThread().name}] Starting to consume record - ${record.key()}")
    val statusCode = initiateIO(record) // Add error-handling depending on kafka topic commit semantics.

    // Chain any-other business logic (depending on status-code) as suspending functions.
    consumeStatusCode(record.key(), statusCode)
  }

  suspend fun initiateIO(record: ConsumerRecord<String, ByteArray>): Int {
    return withContext(Dispatchers.IO) {  // Switch context to IO thread for http.
      println("[${Thread.currentThread().name}] Executing network call - ${record.key()}")
      delay(1000 * 2) // Simulate IO call
      200 // Return status-code
    }
  }

  suspend fun consumeStatusCode(recordKey: String, statusCode: Int) {
    delay(1000 * 1) // Simulate work.
    println("[${Thread.currentThread().name}] consumed record - $recordKey, status-code - $statusCode")
  }

>问题,我想知道如果我们定义下面的 上游配置类 -

@Bean(name = ["testScope"])
  fun defineExtensionScope(): CoroutineScope {
    val threadCount: Int = 4
    return CoroutineScope(Executors.newFixedThreadPool(threadCount).asCoroutineDispatcher())
  }

I have a requirement, where we want to asynchronously handle some upstream request/payload via coroutine. I see that there are several ways to do this, but wondering which is the right approach -

  1. Provide explicit spring service class that implements CoroutineScope
  2. Autowire singleton scope-context backed by certain defined thread-pool dispatcher.
  3. Define method local CoroutineScope object

Following on this question, I'm wondering whats the trade-off if we define method local scopes like below -

fun testSuspensions(count: Int) {
  val launchTime = measureTimeMillis {
    val parentJob = CoroutineScope(Dispatchers.IO).launch {
      repeat(count) {
        this.launch {
          process()  //Some lone running process
        }
      }
    }
  }
}

Alternative approach to autowire explicit scope object backed by custom dispatcher -

@KafkaListener(
    topics = ["test_topic"],
    concurrency = "1",
    containerFactory = "someListenerContainerConfig"
  )
  private fun testKafkaListener(consumerRecord: ConsumerRecord<String, ByteArray>, ack: Acknowledgment) {
    try {
      this.coroutineScope.launch {
        consumeRecordAsync(consumerRecord)
      }
    } finally {
      ack.acknowledge()
    }
  }

  suspend fun consumeRecordAsync(record: ConsumerRecord<String, ByteArray>) {
    println("[${Thread.currentThread().name}] Starting to consume record - ${record.key()}")
    val statusCode = initiateIO(record) // Add error-handling depending on kafka topic commit semantics.

    // Chain any-other business logic (depending on status-code) as suspending functions.
    consumeStatusCode(record.key(), statusCode)
  }

  suspend fun initiateIO(record: ConsumerRecord<String, ByteArray>): Int {
    return withContext(Dispatchers.IO) {  // Switch context to IO thread for http.
      println("[${Thread.currentThread().name}] Executing network call - ${record.key()}")
      delay(1000 * 2) // Simulate IO call
      200 // Return status-code
    }
  }

  suspend fun consumeStatusCode(recordKey: String, statusCode: Int) {
    delay(1000 * 1) // Simulate work.
    println("[${Thread.currentThread().name}] consumed record - $recordKey, status-code - $statusCode")
  }

Autowiring bean as follows in some upstream config class -

@Bean(name = ["testScope"])
  fun defineExtensionScope(): CoroutineScope {
    val threadCount: Int = 4
    return CoroutineScope(Executors.newFixedThreadPool(threadCount).asCoroutineDispatcher())
  }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

旧伤慢歌 2025-01-28 21:01:52

这取决于您的目标。如果您只想避免使用每次重新要求的模型,则可以使用Spring对 suppend 在控制器中的函数(通过使用WebFlux)的支持,这可以消除甚至从使用外部范围中使用的外部范围总而言之:

suspend fun testSuspensions(count: Int) {
  val execTime = measureTimeMillis {
    coroutineScope {
      repeat(count) {
        launch {
          process()  // some long running process
        }
      }
    }
  }
  // all child coroutines are done at this point
}

如果您真的希望您的方法立即返回并安排超过它的旋律,那么您确实需要额外的范围。

关于选项1),将自定义类实现 coroutinescope 不再鼓励(据我所知)。通常建议使用构图(将范围声明为属性,而不是通过自己的类实现接口)。因此,我建议您的选项2。

我想说的是选项3)不可能的问题,因为使用 coroutinescope(dispatchers.io)。这没有比使用 globalscope.launch(dispatchers.io){...} (它具有相同的陷阱) - 您可以阅读有关 globalsCope in 其文档。

主要问题是您在结构化的并发之外运行Coroutines(您的跑步统治不是父母工作的孩子,如果表现不佳,并且您忘记了它们,则可能会积累并拥有资源)。通常,最好定义一个范围,该范围在您不再需要由它运行的任何coroutines时被取消,以便您可以清洁Rogue Coroutines。

就是说,在某些情况下,您确实需要“永远”(在应用程序的一生中)运行Coroutines。在这种情况下,如果需要自定义线程池或异常处理程序之类的内容,则可以使用 globalsCope 。但是,无论如何,都不只是在现场创建一个范围,只是为了启动Coroutine而不保留其范围。

就您而言,似乎您没有清楚的时刻不再关心长期运行的Coroutines,因此您可能对您的Coroutines可以永远生存并且永远不会被取消。在这种情况下,我建议您在组件中连接一个自定义的范围范围。

It depends on what your goal is. If you just want to avoid the thread-per-request model, you can use Spring's support for suspend functions in controllers instead (by using webflux), and that removes the need from even using an external scope at all:

suspend fun testSuspensions(count: Int) {
  val execTime = measureTimeMillis {
    coroutineScope {
      repeat(count) {
        launch {
          process()  // some long running process
        }
      }
    }
  }
  // all child coroutines are done at this point
}

If you really want your method to return immediately and schedule coroutines that outlive it, you indeed need that extra scope.

Regarding option 1), making custom classes implement CoroutineScope is not encouraged anymore (as far as I understood). It's usually suggested to use composition instead (declare a scope as a property instead of implementing the interface by your own classes). So I would suggest your option 2.

I would say option 3) is out of the question, because there is no point in using CoroutineScope(Dispatchers.IO).launch { ... }. It's no better than using GlobalScope.launch(Dispatchers.IO) { ... } (it has the same pitfalls) - you can read about the pitfalls of GlobalScope in its documentation.

The main problem being that you run your coroutines outside structured concurrency (your running coroutines are not children of a parent job and may accumulate and hold resources if they are not well behaved and you forget about them). In general it's better to define a scope that is cancelled when you no longer need any of the coroutines that are run by it, so you can clean rogue coroutines.

That said, in some circumstances you do need to run coroutines "forever" (for the whole life of your application). In that case it's ok to use GlobalScope, or a custom application-wide scope if you need to customize things like the thread pool or exception handler. But in any case don't create a scope on the spot just to launch a coroutine without keeping a handle to it.

In your case, it seems you have no clear moment when you wouldn't care about the long running coroutines anymore, so you may be ok with the fact that your coroutines can live forever and are never cancelled. In that case, I would suggest a custom application-wide scope that you would wire in your components.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文