如何将可观察的排放传递给MutableSharedFlow?

发布于 2025-01-25 03:23:56 字数 1681 浏览 2 评论 0原文

好吧,我有一个可观察的,我已经使用asflow()将其转换但不发出。 我正在尝试从RX和频道迁移到Flow,因此我有此功能

override fun processIntents(intents: Observable<Intent>) {
     intents.asFlow().shareTo(intentsFlow).launchIn(this)
}

shareto()是一个扩展功能,它oneach {recever.emit(it)}processintents存在于基本视图模型中,IntentsFlowmutableSharedFlow

fun <T> Flow<T>.shareTo(receiver: MutableSharedFlow<T>): Flow<T> {
    return onEach { receiver.emit(it) }
}

我想传递来自Intents可观察到IntentsFlow的排放,但它根本不起作用,并且单位测试持续失败。

@Test(timeout = 4000)
fun `WHEN processIntent() with Rx subject or Observable emissions THEN intentsFlow should receive them`() {
    return runBlocking {

        val actual = mutableListOf<TestNumbersIntent>()
        val intentSubject = PublishSubject.create<TestNumbersIntent>()


        val viewModel = FlowViewModel<TestNumbersIntent, TestNumbersViewState>(
            dispatcher = Dispatchers.Unconfined,
            initialViewState = TestNumbersViewState()
        )

        viewModel.processIntents(intentSubject)


        intentSubject.onNext(OneIntent)
        intentSubject.onNext(TwoIntent)
        intentSubject.onNext(ThreeIntent)

        viewModel.intentsFlow.take(3).toList(actual)

        assertEquals(3, actual.size)
        assertEquals(OneIntent, actual[0])
        assertEquals(TwoIntent, actual[1])
        assertEquals(ThreeIntent, actual[2])
    }
}

测试在4000毫秒之后定时出现 org.junit.runners.model.testtimedoutexception:测试时间 4000毫秒

well, I have an Observable, I’ve used asFlow() to convert it but doesn’t emit.
I’m trying to migrate from Rx and Channels to Flow, so I have this function

override fun processIntents(intents: Observable<Intent>) {
     intents.asFlow().shareTo(intentsFlow).launchIn(this)
}

shareTo() is an extension function which does onEach { receiver.emit(it) }, processIntents exists in a base ViewModel, and intentsFlow is a MutableSharedFlow.

fun <T> Flow<T>.shareTo(receiver: MutableSharedFlow<T>): Flow<T> {
    return onEach { receiver.emit(it) }
}

I want to pass emissions coming from the intents Observable to intentsFlow, but it doesn’t work at all and the unit test keeps failing.

@Test(timeout = 4000)
fun `WHEN processIntent() with Rx subject or Observable emissions THEN intentsFlow should receive them`() {
    return runBlocking {

        val actual = mutableListOf<TestNumbersIntent>()
        val intentSubject = PublishSubject.create<TestNumbersIntent>()


        val viewModel = FlowViewModel<TestNumbersIntent, TestNumbersViewState>(
            dispatcher = Dispatchers.Unconfined,
            initialViewState = TestNumbersViewState()
        )

        viewModel.processIntents(intentSubject)


        intentSubject.onNext(OneIntent)
        intentSubject.onNext(TwoIntent)
        intentSubject.onNext(ThreeIntent)

        viewModel.intentsFlow.take(3).toList(actual)

        assertEquals(3, actual.size)
        assertEquals(OneIntent, actual[0])
        assertEquals(TwoIntent, actual[1])
        assertEquals(ThreeIntent, actual[2])
    }
}

test timed out after 4000 milliseconds
org.junit.runners.model.TestTimedOutException: test timed out after
4000 milliseconds

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

浅暮の光 2025-02-01 03:23:56

这项工作

val ps = PublishSubject.create<Int>()
val mf = MutableSharedFlow<Int>()
val pf = ps.asFlow()
    .onEach {
        mf.emit(it)
    }
launch {
    pf.take(3).collect()
}
launch {
    mf.take(3).collect {
        println("$it") // Prints 1 2 3
    }
}
launch {
    yield() // Without this we suspend indefinitely
    ps.onNext(1)
    ps.onNext(2)
    ps.onNext(3)
}

我们需要以(3) s来确保我们的程序终止,因为mutableSharedFlow and publishsubject-ject -&gt;流无限期收集。

我们需要收益,因为我们正在使用一个线程,我们需要给其他Coroutines一个开始工作的机会。


取2

这是更好的。不使用,并自行清理。

发出最后一项后,在publishsubject terminates mutableSharedFlow集合上调用oncomplete。这是一个便利,因此当此代码运行时,它完全终止。这不是要求。您可以根据需要安排工作终止。

您的代码永不终止与mutableSharedFlow从未收集的排放无关。这些是单独的问题。首先是由于以下事实:既不是publishsubject的流量,也不是mutableSharedFlow自行终止。 PublishSubject流程将在onComplete时终止。 mutableSharedFlow将终止当Coroutine(特别是其job)终止它。

Flowpublishsubject.asflow()构建,等待排放。这引入了竞赛条件,即准备收集和调用代码PublishSubject.onnext()

我相信,这就是为什么流程收集未在代码中拾取Onnext排放的原因。

这就是为什么在我们启动从psf收集的Coroutine之后,需要立即需要A 收益

val ps = PublishSubject.create<Int>()
val msf = MutableSharedFlow<Int>()
val psf = ps.asFlow()
    .onEach {
        msf.emit(it)
    }
val j1 = launch {
    psf.collect()
}

yield() // Use this to allow psf.collect to catch up

val j2 = launch {
    msf.collect {
        println("$it") // Prints 1 2 3 4
    }
}

launch {
    ps.onNext(1)
    ps.onNext(2)
    ps.onNext(3)
    ps.onNext(4)
    ps.onComplete()
}
j1.invokeOnCompletion { j2.cancel() }
j2.join()

This works

val ps = PublishSubject.create<Int>()
val mf = MutableSharedFlow<Int>()
val pf = ps.asFlow()
    .onEach {
        mf.emit(it)
    }
launch {
    pf.take(3).collect()
}
launch {
    mf.take(3).collect {
        println("$it") // Prints 1 2 3
    }
}
launch {
    yield() // Without this we suspend indefinitely
    ps.onNext(1)
    ps.onNext(2)
    ps.onNext(3)
}

We need the take(3)s to make sure our program terminates, because MutableSharedFlow and PublishSubject -> Flow collect indefinitely.

We need the yield because we're working with a single thread and we need to give the other coroutines an opportunity to start working.


Take 2

This is much better. Doesn't use take, and cleans up after itself.

After emitting the last item, calling onComplete on the PublishSubject terminates MutableSharedFlow collection. This is a convenience, so that when this code runs it terminates completely. It is not a requirement. You can arrange your Job termination however you like.

Your code never terminating is not related to the emissions never being collected by the MutableSharedFlow. These are separate concerns. The first is due to the fact that neither a flow created from a PublishSubject, nor a MutableSharedFlow, terminates on its own. The PublishSubject flow will terminate when onComplete is called. The MutableSharedFlow will terminate when the coroutine (specifically, its Job) collecting it terminates.

The Flow constructed by PublishSubject.asFlow() drops any emissions if, at the time of the emission, collection of the Flow hasn't suspended, waiting for emissions. This introduces a race condition between being ready to collect and code that calls PublishSubject.onNext().

This, I believe, is the reason why flow collection isn't picking up the onNext emissions in your code.

It's why a yield is required right after we launch the coroutine that collects from psf.

val ps = PublishSubject.create<Int>()
val msf = MutableSharedFlow<Int>()
val psf = ps.asFlow()
    .onEach {
        msf.emit(it)
    }
val j1 = launch {
    psf.collect()
}

yield() // Use this to allow psf.collect to catch up

val j2 = launch {
    msf.collect {
        println("$it") // Prints 1 2 3 4
    }
}

launch {
    ps.onNext(1)
    ps.onNext(2)
    ps.onNext(3)
    ps.onNext(4)
    ps.onComplete()
}
j1.invokeOnCompletion { j2.cancel() }
j2.join()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文