如何合并两个 Observables,以便当任何一个 Observables 完成时结果也完成?
我有这样的代码:
var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));
ss.Subscribe(_ => Console.WriteLine("Next"));
s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(
我已经使用 OnError(new OperationCanceledException()) 解决了我的问题,但我想要一个更好的解决方案(必须有一个组合器,对吧?)。
I have this code:
var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));
ss.Subscribe(_ => Console.WriteLine("Next"));
s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(
I've solved my problem using OnError(new OperationCanceledException()), but I'd like a better solution (there has to be a combinator right?).
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
或者这个,这也很简洁:
它使用一个主题,它将确保只有一个 OnCompleted 被推送到 CreateWithDisposable() 中的观察者;
Or this, which is also quite neat:
This uses a subject which will make sure only one OnCompleted is pushed to the observer in the CreateWithDisposable();
我建议将 onCompleted 事件转换为 onNext 事件并使用 var ss = s1.Merge(s2).TakeUntil(s1ors2complete) ,其中 s1ors2complete 生成一个值,而不是在任一流完成时重写 Merge 以完成当 s1 或 s2 结束时。您也可以仅链接
.TakeUntil(s1completes).TakeUntil(s2completes)
而不是创建 s1ors2complete。此方法提供了比 MergeWithCompleteOnEither 扩展更好的组合,因为它可用于将任何“两个完成时完成”运算符修改为“任何一个完成时完成”运算符。至于如何将 onNext 事件转换为 onCompleted 事件,有几种方法可以实现。 CompositeDisposable 方法听起来是一个不错的方法,经过一些搜索发现这个有趣的线程关于 在 onNext、onError 和 onCompleted 通知之间转换。我可能会使用 xs.SkipWhile(_ => true).concat(Observable.Return(True)) 创建一个名为 ReturnTrueOnCompleted 的扩展方法,然后您的合并将变为:
您还可以查看使用像 Zip 这样的运算符 当输入流之一完成时自动完成。
Instead of re-writing Merge to finish when either stream completes I would suggest converting the onCompleted events to onNext events and using
var ss = s1.Merge(s2).TakeUntil(s1ors2complete)
where s1ors2complete produces a value when either s1 or s2 ends. You could also just chain.TakeUntil(s1completes).TakeUntil(s2completes)
instead of creating s1ors2complete. This approach provides better composition than a MergeWithCompleteOnEither extension as it can be used to modify any "complete when both complete" operator into a "complete when any completes" operator.As for how to convert onNext events to onCompleted events, there are a few ways to do that. The CompositeDisposable method sounds like a good approach, and a bit of searching finds this interesting thread about converting between onNext, onError, and onCompleted notifications. I'd probably create an extension method called ReturnTrueOnCompleted using
xs.SkipWhile(_ => true).concat(Observable.Return(True))
and your merge then becomes:You could also look at using an operator like Zip which automatically completes when one of the input streams completes.
假设您不需要任何一个流的输出,您可以将
Amb
与Materialize
中的一些魔法结合使用:如果您需要这些值,您可以使用
做
这两个科目。Assuming you don't need the output of either of the streams, you can use
Amb
combined with some magic fromMaterialize
:If you need the values, you can use
Do
on the two subjects.试试这个:
它的作用是连接一个 IObservable,当源或正确的源完成时,该 IObservable 将抛出异常。然后,我们可以使用 Catch 扩展方法返回一个空的 Observable,以便在任一完成时自动完成流。
Try this:
What this does is concatenate an IObservable that will throw an exception when either the source or the right source completes. We can then use the Catch extension method to return an empty Observable to automatically complete the stream when either completes.