如何合并两个 Observables,以便当任何一个 Observables 完成时结果也完成?

发布于 2024-10-15 17:49:05 字数 466 浏览 16 评论 0原文

我有这样的代码:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

我已经使用 OnError(new OperationCanceledException()) 解决了我的问题,但我想要一个更好的解决方案(必须有一个组合器,对吧?)。

I have this code:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

I've solved my problem using OnError(new OperationCanceledException()), but I'd like a better solution (there has to be a combinator right?).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

梦毁影碎の 2024-10-22 17:49:05

或者这个,这也很简洁:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

它使用一个主题,它将确保只有一个 OnCompleted 被推送到 CreateWithDisposable() 中的观察者;

Or this, which is also quite neat:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

This uses a subject which will make sure only one OnCompleted is pushed to the observer in the CreateWithDisposable();

能否归途做我良人 2024-10-22 17:49:05

我建议将 onCompleted 事件转换为 onNext 事件并使用 var ss = s1.Merge(s2).TakeUntil(s1ors2complete) ,其中 s1ors2complete 生成一个值,而不是在任一流完成时重写 Merge 以完成当 s1 或 s2 结束时。您也可以仅链接 .TakeUntil(s1completes).TakeUntil(s2completes) 而不是创建 s1ors2complete。此方法提供了比 MergeWithCompleteOnEither 扩展更好的组合,因为它可用于将任何“两个完成时完成”运算符修改为“任何一个完成时完成”运算符。

至于如何将 onNext 事件转换为 onCompleted 事件,有几种方法可以实现。 CompositeDisposable 方法听起来是一个不错的方法,经过一些搜索发现这个有趣的线程关于 在 onNext、onError 和 onCompleted 通知之间转换。我可能会使用 xs.SkipWhile(_ => true).concat(Observable.Return(True)) 创建一个名为 ReturnTrueOnCompleted 的扩展方法,然后您的合并将变为:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

您还可以查看使用像 Zip 这样的运算符 当输入流之一完成时自动完成

Instead of re-writing Merge to finish when either stream completes I would suggest converting the onCompleted events to onNext events and using var ss = s1.Merge(s2).TakeUntil(s1ors2complete) where s1ors2complete produces a value when either s1 or s2 ends. You could also just chain .TakeUntil(s1completes).TakeUntil(s2completes) instead of creating s1ors2complete. This approach provides better composition than a MergeWithCompleteOnEither extension as it can be used to modify any "complete when both complete" operator into a "complete when any completes" operator.

As for how to convert onNext events to onCompleted events, there are a few ways to do that. The CompositeDisposable method sounds like a good approach, and a bit of searching finds this interesting thread about converting between onNext, onError, and onCompleted notifications. I'd probably create an extension method called ReturnTrueOnCompleted using xs.SkipWhile(_ => true).concat(Observable.Return(True)) and your merge then becomes:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

You could also look at using an operator like Zip which automatically completes when one of the input streams completes.

深爱成瘾 2024-10-22 17:49:05

假设您不需要任何一个流的输出,您可以将 AmbMaterialize 中的一些魔法结合使用:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

如果您需要这些值,您可以使用 这两个科目。

Assuming you don't need the output of either of the streams, you can use Amb combined with some magic from Materialize:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

If you need the values, you can use Do on the two subjects.

鸠魁 2024-10-22 17:49:05

试试这个:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

它的作用是连接一个 IObservable,当源或正确的源完成时,该 IObservable 将抛出异常。然后,我们可以使用 Catch 扩展方法返回一个空的 Observable,以便在任一完成时自动完成流。

Try this:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

What this does is concatenate an IObservable that will throw an exception when either the source or the right source completes. We can then use the Catch extension method to return an empty Observable to automatically complete the stream when either completes.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文