Rx 扩展:如何使一个订阅依赖于另一个订阅?

发布于 2024-11-24 14:38:45 字数 2810 浏览 0 评论 0原文

我有一个类,它在其构造函数中接受一个可观察的对象,然后订阅它并执行一些操作,设置属性等。该类本身是可观察的。

仅当有人订阅了我的课程时,我才想订阅我的可观察源,但我不知道该怎么做。

public MyClass : IObservable<MyResult>
{
    private readonly Subject<MyResult> _subject = new Subject<MyResult>();
    private readonly IConnectableObservable<MySource> _source;

    public MyClass(IObservable<MySource> source)
    {
         _source = source
             //All my logic to set properties and such
             //goes here as a side effect, instead of in a subscription...
             .Do(...)
             //I hope that by publishing, side effects will happen only once...
             .Publish();
    }

    public IDisposable Subscribe(IObserver<MyResult> observer)
    {
        return new CompositeDisposable(
             _source.Subscribe(/* 
                  don't have anything to do here,
                  just subscribing to make sure I'm subscribed to source...
                  (this can't be the right way to do it)
             */),
             _subject.Subscribe(observer));
    }
}

更新

@Scott:我明白为什么实现 IObservable 将是一种反模式。 My Class 需要使用单个可观察量,并公开 3 个作为属性(最初最常用的可观察量将由 MyClass 本身返回,但我认为拥有它作为一个属性可能会更好。

我想写的是一个可观察的 ICommand,我知道一些存在,但这更多的是学习 Rx 的一种方式......

public class ObservableCommand<T> : ICommand
{
    private readonly ISubject<T> _executeRequests = new Subject<T>();
    private readonly ISubject<T> _canExecuteRequests = new Subject<T>();

    public IObservable<bool> CanExecuteChanges { get; private set; }
    public IObservable<T> CanExecuteRequests { get; private set; }
    public IObservable<T> ExecuteRequests { get; private set; }

    public ObservableCommand(IObservable<bool> canExecute)
    {
        var source = canExecute.DistinctUntilChanged()

        //How do I dispose of subscription later?
        //I have this fear that I'm going to have a chain of references, 
        //and my entire app will never get GC'd!
        var subscription = source.Subscribe(
            o => {
                if (CanExecuteChanged != null)
                    CanExecuteChanged(this, EventArgs.Empty);
            });

        CanExecuteChanges = source;

        CanExecuteRequests = _canExecuteRequests.AsObservable();

        ExecuteRequests = _executeRequests.AsObservable();
    }

    #region ICommand Members

    public bool  CanExecute(object parameter)
    {
        _canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    public event EventHandler  CanExecuteChanged;

    public void  Execute(object parameter)
    {
        _executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    #endregion
}

I have a class that takes an observable in its constructor, then subscribes to it and does some stuff, sets properties etc. The class itself is observable.

I want to subscribe to my source observable only if someone is subscribed to my class, but I can't figure out how to do it.

public MyClass : IObservable<MyResult>
{
    private readonly Subject<MyResult> _subject = new Subject<MyResult>();
    private readonly IConnectableObservable<MySource> _source;

    public MyClass(IObservable<MySource> source)
    {
         _source = source
             //All my logic to set properties and such
             //goes here as a side effect, instead of in a subscription...
             .Do(...)
             //I hope that by publishing, side effects will happen only once...
             .Publish();
    }

    public IDisposable Subscribe(IObserver<MyResult> observer)
    {
        return new CompositeDisposable(
             _source.Subscribe(/* 
                  don't have anything to do here,
                  just subscribing to make sure I'm subscribed to source...
                  (this can't be the right way to do it)
             */),
             _subject.Subscribe(observer));
    }
}

UPDATE

@Scott: I can see why implementing IObservable would be an anti-pattern. My Class needs to consume a single observable, and exposes 3 as properties (originally the most commonly used observable was going to be returned by MyClass itself, but I think that having it as a property might be better.

What I'm trying to write is an observable ICommand. I know some exist, but this is more of a way to learn Rx...

public class ObservableCommand<T> : ICommand
{
    private readonly ISubject<T> _executeRequests = new Subject<T>();
    private readonly ISubject<T> _canExecuteRequests = new Subject<T>();

    public IObservable<bool> CanExecuteChanges { get; private set; }
    public IObservable<T> CanExecuteRequests { get; private set; }
    public IObservable<T> ExecuteRequests { get; private set; }

    public ObservableCommand(IObservable<bool> canExecute)
    {
        var source = canExecute.DistinctUntilChanged()

        //How do I dispose of subscription later?
        //I have this fear that I'm going to have a chain of references, 
        //and my entire app will never get GC'd!
        var subscription = source.Subscribe(
            o => {
                if (CanExecuteChanged != null)
                    CanExecuteChanged(this, EventArgs.Empty);
            });

        CanExecuteChanges = source;

        CanExecuteRequests = _canExecuteRequests.AsObservable();

        ExecuteRequests = _executeRequests.AsObservable();
    }

    #region ICommand Members

    public bool  CanExecute(object parameter)
    {
        _canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    public event EventHandler  CanExecuteChanged;

    public void  Execute(object parameter)
    {
        _executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    #endregion
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

自由范儿 2024-12-01 14:38:45

是否不在构造函数中执行或发布,而是在订阅方法中执行或发布怎么样?

应该说,显式实现 IObservable是一种 Rx 反模式。

您可以使用 DeferCreate 使订阅依赖于其他订阅者,例如

IObservable<MySource> source;
IObservable<MySource> sourceWithSubSideEffect =  Observable.Defer(() =>
{
   // Do something interesting on Subscription
   // ....
   return source;
});

How about just not Doing or Publishing in the constructor, but rather in the Subscribe method?

It should be said, explicitly implementing IObservable<T> is something of an Rx anti-pattern.

You can make Subscriptions dependent on other subscribers with Defer and Create, something like

IObservable<MySource> source;
IObservable<MySource> sourceWithSubSideEffect =  Observable.Defer(() =>
{
   // Do something interesting on Subscription
   // ....
   return source;
});
拿命拼未来 2024-12-01 14:38:45

我已经给你准备了一张截图。 MyClass 实现了 IObservable 并且还具有 IObserver 的方法,但它们都是私有的。通过附加的 OnInitializeOnSubscribe,您应该能够对您想要响应的任何事件执行任何操作。

如果您想让这个片段可重用,您可以将所有方法定义为 partial,因为它们都返回 void。然后你可以创建任何你想要的定义。

public class MyClass<T> : IObservable<T>
{
    private readonly IObservable<T> m_Source;

    public MyClass(IObservable<T> source)
    {
        if (source == null) throw new ArgumentNullException("source");
        m_Source = source.Do(OnNext, OnError, OnCompleted);
        OnInitialize();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        OnSubscribe();
        return m_Source.Subscribe(observer);
    }

    private void OnInitialize()
    {
        Console.WriteLine("OnInitialize");
    }
    private void OnSubscribe()
    {
        Console.WriteLine("OnSubscribe");
    }
    private void OnNext(T value)
    {
        Console.WriteLine("OnNext: {0}", value);
    }
    private void OnError(Exception error)
    {
        Console.WriteLine("OnError: {0}", error.Message);
    }
    private void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }    
}

I've prepared a snipped for you. MyClass implements IObservable<T> and has also methods of IObserver<T> but they are all private. With additional OnInitialize and OnSubscribe you should be able to do whatever you want on any event you want to response to.

If you want to make this snipped reusable you could define all methods as partial as they all return void. Then you could create definition to whatever you want.

public class MyClass<T> : IObservable<T>
{
    private readonly IObservable<T> m_Source;

    public MyClass(IObservable<T> source)
    {
        if (source == null) throw new ArgumentNullException("source");
        m_Source = source.Do(OnNext, OnError, OnCompleted);
        OnInitialize();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        OnSubscribe();
        return m_Source.Subscribe(observer);
    }

    private void OnInitialize()
    {
        Console.WriteLine("OnInitialize");
    }
    private void OnSubscribe()
    {
        Console.WriteLine("OnSubscribe");
    }
    private void OnNext(T value)
    {
        Console.WriteLine("OnNext: {0}", value);
    }
    private void OnError(Exception error)
    {
        Console.WriteLine("OnError: {0}", error.Message);
    }
    private void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }    
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文