如何将可观察的排放传递给MutableSharedFlow?
好吧,我有一个可观察的,我已经使用asflow()将其转换但不发出。 我正在尝试从RX和频道迁移到Flow,因此我有此功能
override fun processIntents(intents: Observable<Intent>) {
intents.asFlow().shareTo(intentsFlow).launchIn(this)
}
shareto()
是一个扩展功能,它oneach {recever.emit(it)}
,processintents
存在于基本视图模型中,IntentsFlow
是mutableSharedFlow
。
fun <T> Flow<T>.shareTo(receiver: MutableSharedFlow<T>): Flow<T> {
return onEach { receiver.emit(it) }
}
我想传递来自Intents
可观察到IntentsFlow
的排放,但它根本不起作用,并且单位测试持续失败。
@Test(timeout = 4000)
fun `WHEN processIntent() with Rx subject or Observable emissions THEN intentsFlow should receive them`() {
return runBlocking {
val actual = mutableListOf<TestNumbersIntent>()
val intentSubject = PublishSubject.create<TestNumbersIntent>()
val viewModel = FlowViewModel<TestNumbersIntent, TestNumbersViewState>(
dispatcher = Dispatchers.Unconfined,
initialViewState = TestNumbersViewState()
)
viewModel.processIntents(intentSubject)
intentSubject.onNext(OneIntent)
intentSubject.onNext(TwoIntent)
intentSubject.onNext(ThreeIntent)
viewModel.intentsFlow.take(3).toList(actual)
assertEquals(3, actual.size)
assertEquals(OneIntent, actual[0])
assertEquals(TwoIntent, actual[1])
assertEquals(ThreeIntent, actual[2])
}
}
测试在4000毫秒之后定时出现 org.junit.runners.model.testtimedoutexception:测试时间 4000毫秒
well, I have an Observable, I’ve used asFlow() to convert it but doesn’t emit.
I’m trying to migrate from Rx and Channels to Flow, so I have this function
override fun processIntents(intents: Observable<Intent>) {
intents.asFlow().shareTo(intentsFlow).launchIn(this)
}
shareTo()
is an extension function which does onEach { receiver.emit(it) }
, processIntents
exists in a base ViewModel, and intentsFlow
is a MutableSharedFlow
.
fun <T> Flow<T>.shareTo(receiver: MutableSharedFlow<T>): Flow<T> {
return onEach { receiver.emit(it) }
}
I want to pass emissions coming from the intents
Observable to intentsFlow
, but it doesn’t work at all and the unit test keeps failing.
@Test(timeout = 4000)
fun `WHEN processIntent() with Rx subject or Observable emissions THEN intentsFlow should receive them`() {
return runBlocking {
val actual = mutableListOf<TestNumbersIntent>()
val intentSubject = PublishSubject.create<TestNumbersIntent>()
val viewModel = FlowViewModel<TestNumbersIntent, TestNumbersViewState>(
dispatcher = Dispatchers.Unconfined,
initialViewState = TestNumbersViewState()
)
viewModel.processIntents(intentSubject)
intentSubject.onNext(OneIntent)
intentSubject.onNext(TwoIntent)
intentSubject.onNext(ThreeIntent)
viewModel.intentsFlow.take(3).toList(actual)
assertEquals(3, actual.size)
assertEquals(OneIntent, actual[0])
assertEquals(TwoIntent, actual[1])
assertEquals(ThreeIntent, actual[2])
}
}
test timed out after 4000 milliseconds
org.junit.runners.model.TestTimedOutException: test timed out after
4000 milliseconds
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这项工作
我们需要
以(3)
s来确保我们的程序终止,因为mutableSharedFlow
andpublishsubject-ject -&gt;流
无限期收集。我们需要
收益
,因为我们正在使用一个线程,我们需要给其他Coroutines一个开始工作的机会。取2
这是更好的。不使用
取
,并自行清理。发出最后一项后,在
publishsubject
terminatesmutableSharedFlow
集合上调用oncomplete
。这是一个便利,因此当此代码运行时,它完全终止。这不是要求。您可以根据需要安排工作终止。您的代码永不终止与
mutableSharedFlow
从未收集的排放无关。这些是单独的问题。首先是由于以下事实:既不是publishsubject
的流量,也不是mutableSharedFlow
自行终止。PublishSubject
流程将在onComplete
时终止。mutableSharedFlow
将终止当Coroutine(特别是其job
)终止它。Flow
由publishsubject.asflow()
构建,等待排放。这引入了竞赛条件,即准备收集和调用代码PublishSubject.onnext()
。我相信,这就是为什么流程收集未在代码中拾取
Onnext
排放的原因。这就是为什么在我们启动从
psf
收集的Coroutine之后,需要立即需要A收益
。This works
We need the
take(3)
s to make sure our program terminates, becauseMutableSharedFlow
andPublishSubject -> Flow
collect indefinitely.We need the
yield
because we're working with a single thread and we need to give the other coroutines an opportunity to start working.Take 2
This is much better. Doesn't use
take
, and cleans up after itself.After emitting the last item, calling
onComplete
on thePublishSubject
terminatesMutableSharedFlow
collection. This is a convenience, so that when this code runs it terminates completely. It is not a requirement. You can arrange your Job termination however you like.Your code never terminating is not related to the emissions never being collected by the
MutableSharedFlow
. These are separate concerns. The first is due to the fact that neither a flow created from aPublishSubject
, nor aMutableSharedFlow
, terminates on its own. ThePublishSubject
flow will terminate whenonComplete
is called. TheMutableSharedFlow
will terminate when the coroutine (specifically, itsJob
) collecting it terminates.The
Flow
constructed byPublishSubject.asFlow()
drops any emissions if, at the time of the emission, collection of theFlow
hasn't suspended, waiting for emissions. This introduces a race condition between being ready to collect and code that callsPublishSubject.onNext()
.This, I believe, is the reason why flow collection isn't picking up the
onNext
emissions in your code.It's why a
yield
is required right after we launch the coroutine that collects frompsf
.