如何顺序运行Kotlin Coroutines?

发布于 2025-02-10 00:58:45 字数 887 浏览 2 评论 0原文

我有一个coroutinescopelog()函数的实例,该函数看起来如下:

private val scope = CoroutineScope(Dispatchers.IO)

fun log(message: String) = scope.launch { // launching a coroutine
    println("$message")
    TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
}

我使用此测试代码启动Coroutines:

repeat(5) { item ->
    log("Log $item")
}

log()函数可以从任何位置,在任何thread中都从任何位置调用,但不能从coroutine中调用。

经过几次测试后,我可以看到如下的顺序结果:

Log 0
Log 2
Log 4
Log 1
Log 3

打印日志的顺序可能不同。如果我正确理解coroutines的执行不能保证是顺序的。这意味着可以在项目0的Coroutine之前启动项目2的Coroutine。

我希望为每个项目依次启动Coroutines,并且“某些阻止操作”将依次执行,以始终实现下一个日志:

Log 0
Log 1
Log 2
Log 3
Log 4

有没有办法使启动Coroutines顺序?也许还有其他方法可以实现我想要的东西?

在此先感谢您的任何帮助!

I have an instance of CoroutineScope and log() function which look like the following:

private val scope = CoroutineScope(Dispatchers.IO)

fun log(message: String) = scope.launch { // launching a coroutine
    println("$message")
    TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
}

And I use this test code to launch coroutines:

repeat(5) { item ->
    log("Log $item")
}

The log() function can be called from any place, in any Thread, but not from a coroutine.

After a couple of tests I can see not sequential result like the following:

Log 0
Log 2
Log 4
Log 1
Log 3

There can be different order of printed logs. If I understand correctly the execution of coroutines doesn't guarantee to be sequential. What it means is that a coroutine for item 2 can be launched before the coroutine for item 0.

I want that coroutines were launched sequentially for each item and "some blocking operation" would execute sequentially, to always achieve next logs:

Log 0
Log 1
Log 2
Log 3
Log 4

Is there a way to make launching coroutines sequential? Or maybe there are other ways to achieve what I want?

Thanks in advance for any help!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

暗恋未遂 2025-02-17 00:58:45

一种可能的策略是使用渠道按顺序加入启动的作业。您需要懒洋洋地启动工作,以便在加入之前不会启动。 Trysend在频道具有无限容量时始终成功。您需要使用trysend,因此可以从Coroutine外部调用。

private val lazyJobChannel = Channel<Job>(capacity = Channel.UNLIMITED).apply {
    scope.launch {
        consumeEach { it.join() }
    }
}

fun log(message: String) {
    lazyJobChannel.trySend(
        scope.launch(start = CoroutineStart.LAZY) {
            println("$message")
            TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
        }
    )
}

One possible strategy is to use a Channel to join the launched jobs in order. You need to launch the jobs lazily so they don't start until join is called on them. trySend always succeeds when the Channel has unlimited capacity. You need to use trySend so it can be called from outside a coroutine.

private val lazyJobChannel = Channel<Job>(capacity = Channel.UNLIMITED).apply {
    scope.launch {
        consumeEach { it.join() }
    }
}

fun log(message: String) {
    lazyJobChannel.trySend(
        scope.launch(start = CoroutineStart.LAZY) {
            println("$message")
            TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
        }
    )
}
心的憧憬 2025-02-17 00:58:45

因为 flow flow是顺序我们可以使用mutableSharedFlow依次收集和处理数据:

class Info {
    // make sure replay(in case some jobs were emitted before sharedFlow is being collected and could be lost)
    // and extraBufferCapacity are large enough to handle all the jobs. 
    // In case some jobs are lost try to increase either of the values.
    private val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
    private val scope = CoroutineScope(Dispatchers.IO)

    init {
        sharedFlow.onEach { message ->
            println("$message")
            TimeUnit.MILLISECONDS.sleep(100) // some blocking or suspend operation
        }.launchIn(scope)
    }

    fun log(message: String) {
        sharedFlow.tryEmit(message) 
    }
}

fun test() {

    val info = Info()

    repeat(10) { item ->
        info.log("Log $item")
    }
}

它始终以正确的顺序打印日志:

Log 0
Log 1
Log 2
...
Log 9

它适用于所有情况,但是需要确保设置了足够的元素以replayextrabufferCapacity mutableSharedFlow的参数处理所有项目。


另一种方法是

使用dispatchers.io.limitedParallelism(1)作为coroutinescope的上下文。如果Coroutines不包含暂停功能的调用,并从相同的 thread ,例如主线程,它使得它们依次运行。因此,此解决方案仅与阻止(非代码>悬挂)操作启动>启动 coroutine builder:

private val scope = CoroutineScope(Dispatchers.IO.limitedParallelism(1))

fun log(message: String) = scope.launch { // launching a coroutine from the same Thread, e.g. Main Thread
    println("$message")
    TimeUnit.MILLISECONDS.sleep(100) // only blocking operation, not `suspend` operation
}

事实证明,单线程调度程序是FIFO执行者。因此,将coroutinescope执行到一个线程解决问题。

Since Flows are sequential we can use MutableSharedFlow to collect and handle data sequentially:

class Info {
    // make sure replay(in case some jobs were emitted before sharedFlow is being collected and could be lost)
    // and extraBufferCapacity are large enough to handle all the jobs. 
    // In case some jobs are lost try to increase either of the values.
    private val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
    private val scope = CoroutineScope(Dispatchers.IO)

    init {
        sharedFlow.onEach { message ->
            println("$message")
            TimeUnit.MILLISECONDS.sleep(100) // some blocking or suspend operation
        }.launchIn(scope)
    }

    fun log(message: String) {
        sharedFlow.tryEmit(message) 
    }
}

fun test() {

    val info = Info()

    repeat(10) { item ->
        info.log("Log $item")
    }
}

It always prints the logs in the correct order:

Log 0
Log 1
Log 2
...
Log 9

It works for all cases, but need to be sure there are enough elements set to replay and extraBufferCapacity parameters of MutableSharedFlow to handle all items.


Another approach is

Using Dispatchers.IO.limitedParallelism(1) as a context for the CoroutineScope. It makes coroutines run sequentially if they don't contain calls to suspend functions and launched from the same Thread, e.g. Main Thread. So this solution works only with blocking (not suspend) operation inside launch coroutine builder:

private val scope = CoroutineScope(Dispatchers.IO.limitedParallelism(1))

fun log(message: String) = scope.launch { // launching a coroutine from the same Thread, e.g. Main Thread
    println("$message")
    TimeUnit.MILLISECONDS.sleep(100) // only blocking operation, not `suspend` operation
}

It turns out that the single thread dispatcher is a FIFO executor. So limiting the CoroutineScope execution to one thread solves the problem.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文