如何使用协程延迟迭代大型(大于内存)Kotlin 序列

发布于 2025-01-17 05:54:59 字数 737 浏览 1 评论 0原文

我有一个延迟生成的大序列,并且太大而无法放入内存中。

我想使用协程来处理这个序列以提高性能,在本例中我使用 10 个并行线程进行处理。

            runBlocking(Dispatchers.IO.limitedParallelism(10)) {
                massiveLazySequenceOfThings.forEach { myThingToProcess ->
                    print("I am processing $myThingToProcess")
                    launch() {
                        process(myThingToProcess)
                    }
                }
            }

这里的问题是,第一个打印语句将对序列中的每个项目执行,因此对于像我这样的非常大的序列,这将导致 OOM。

在这个例子中,有没有办法让我的序列的迭代变得“惰性”,以便在任何时候只处理固定的数字?

我是否被迫在此处使用通道(可能使用缓冲通道?)在序列迭代期间强制执行阻塞调用,直到处理某些项目为止?或者我还缺少其他一些更清洁的解决方案吗?

在我的实际示例中,我还使用supervisorScope来监视每个处理作业,因此如果可能的话我也想保留它。

I have a large sequence that is lazily generated, and is far too big to fit in memory.

I would like to process this sequence using coroutines to improve performance, in this example I am using 10 parallel threads for processing.

            runBlocking(Dispatchers.IO.limitedParallelism(10)) {
                massiveLazySequenceOfThings.forEach { myThingToProcess ->
                    print("I am processing $myThingToProcess")
                    launch() {
                        process(myThingToProcess)
                    }
                }
            }

The problem here is that the first print statement will be executed for EVERY item in the sequence, so for extremely large sequences like mine, this will cause OOM.

Is there no way to make the iteration of my sequence "lazy" in this example, so that only a fixed number are being processed at any one time?

Am I forced to use channels here (possibly with a buffered channel?) to force a blocking call during my sequence iteration until some items are processed? Or is there some other cleaner solution that I am missing.

In my actualt example, I am also using a supervisorScope to monitor each processing job, so If possible I would like to preserve that as well.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

歌枕肩 2025-01-24 05:55:00

您可以使用 信号量

runBlocking(Dispatchers.IO) {
    val semaphore = Semaphore(permits = 10)
    massiveLazySequenceOfThings.forEach { myThingToProcess ->
        semaphore.acquire()
        print("I am processing $myThingToProcess")
        launch {
            try {
                process(myThingToProcess)
            } finally {
                print("I am done processing $myThingToProcess")
                semaphore.release()
            }
        }
    }
}

You can limit the number of parallel operations with Semaphore.

runBlocking(Dispatchers.IO) {
    val semaphore = Semaphore(permits = 10)
    massiveLazySequenceOfThings.forEach { myThingToProcess ->
        semaphore.acquire()
        print("I am processing $myThingToProcess")
        launch {
            try {
                process(myThingToProcess)
            } finally {
                print("I am done processing $myThingToProcess")
                semaphore.release()
            }
        }
    }
}
白衬杉格子梦 2025-01-24 05:55:00

问题可能是您安排新任务的速度比并行完成任务的速度要快。

您可以通过将数据分块到可以并行处理的大小(在您的示例中为 10)并等待每个块完成然后再开始下一个块来克服此问题:

runBlocking(Dispatchers.IO) {
    massiveLazySequenceOfThings.chunked(10).forEach { chunk ->
        println("I am processing $chunk")
        chunk.map { async { process(it) } }.awaitAll()
    }
}

现在一个块中的 10 个元素中的每一个并行处理,但每个块作为一个整体是按顺序处理的,确保您不会因计划任务太多而耗尽内存。

一个小旁注:您的示例中的输出 "I am Doneprocessing $myThingToProcess" 不正确。该作业可能仍在其自己的线程中运行。你唯一可以肯定的是你已经安排好了。

The problem is probably that you are faster scheduling new tasks than it is possible to finish the tasks in parallel.

You can overcome this problem by chunking your data into the size that can be processed in parallel (in your example that is 10) and wait for each chunk to finish before you start with the next chunk:

runBlocking(Dispatchers.IO) {
    massiveLazySequenceOfThings.chunked(10).forEach { chunk ->
        println("I am processing $chunk")
        chunk.map { async { process(it) } }.awaitAll()
    }
}

Now each of the 10 elements in a chunk is processed in parallel, but each chunk as a whole is processed sequentially, making sure that you do not run out of memory due to too much scheduled tasks.

One little side note: The output "I am done processing $myThingToProcess" in your example is not correct. The job may still be running in its own thread. The only thing you can say for sure is that you scheduled it.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文