Rx 扩展:如何使一个订阅依赖于另一个订阅?
我有一个类,它在其构造函数中接受一个可观察的对象,然后订阅它并执行一些操作,设置属性等。该类本身是可观察的。
仅当有人订阅了我的课程时,我才想订阅我的可观察源,但我不知道该怎么做。
public MyClass : IObservable<MyResult>
{
private readonly Subject<MyResult> _subject = new Subject<MyResult>();
private readonly IConnectableObservable<MySource> _source;
public MyClass(IObservable<MySource> source)
{
_source = source
//All my logic to set properties and such
//goes here as a side effect, instead of in a subscription...
.Do(...)
//I hope that by publishing, side effects will happen only once...
.Publish();
}
public IDisposable Subscribe(IObserver<MyResult> observer)
{
return new CompositeDisposable(
_source.Subscribe(/*
don't have anything to do here,
just subscribing to make sure I'm subscribed to source...
(this can't be the right way to do it)
*/),
_subject.Subscribe(observer));
}
}
更新
@Scott:我明白为什么实现 IObservable 将是一种反模式。 My Class
需要使用单个可观察量,并公开 3 个作为属性(最初最常用的可观察量将由 MyClass
本身返回,但我认为拥有它作为一个属性可能会更好。
我想写的是一个可观察的 ICommand,我知道一些存在,但这更多的是学习 Rx 的一种方式......
public class ObservableCommand<T> : ICommand
{
private readonly ISubject<T> _executeRequests = new Subject<T>();
private readonly ISubject<T> _canExecuteRequests = new Subject<T>();
public IObservable<bool> CanExecuteChanges { get; private set; }
public IObservable<T> CanExecuteRequests { get; private set; }
public IObservable<T> ExecuteRequests { get; private set; }
public ObservableCommand(IObservable<bool> canExecute)
{
var source = canExecute.DistinctUntilChanged()
//How do I dispose of subscription later?
//I have this fear that I'm going to have a chain of references,
//and my entire app will never get GC'd!
var subscription = source.Subscribe(
o => {
if (CanExecuteChanged != null)
CanExecuteChanged(this, EventArgs.Empty);
});
CanExecuteChanges = source;
CanExecuteRequests = _canExecuteRequests.AsObservable();
ExecuteRequests = _executeRequests.AsObservable();
}
#region ICommand Members
public bool CanExecute(object parameter)
{
_canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
}
public event EventHandler CanExecuteChanged;
public void Execute(object parameter)
{
_executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
}
#endregion
}
I have a class that takes an observable in its constructor, then subscribes to it and does some stuff, sets properties etc. The class itself is observable.
I want to subscribe to my source observable only if someone is subscribed to my class, but I can't figure out how to do it.
public MyClass : IObservable<MyResult>
{
private readonly Subject<MyResult> _subject = new Subject<MyResult>();
private readonly IConnectableObservable<MySource> _source;
public MyClass(IObservable<MySource> source)
{
_source = source
//All my logic to set properties and such
//goes here as a side effect, instead of in a subscription...
.Do(...)
//I hope that by publishing, side effects will happen only once...
.Publish();
}
public IDisposable Subscribe(IObserver<MyResult> observer)
{
return new CompositeDisposable(
_source.Subscribe(/*
don't have anything to do here,
just subscribing to make sure I'm subscribed to source...
(this can't be the right way to do it)
*/),
_subject.Subscribe(observer));
}
}
UPDATE
@Scott: I can see why implementing IObservable would be an anti-pattern. My Class
needs to consume a single observable, and exposes 3 as properties (originally the most commonly used observable was going to be returned by MyClass
itself, but I think that having it as a property might be better.
What I'm trying to write is an observable ICommand. I know some exist, but this is more of a way to learn Rx...
public class ObservableCommand<T> : ICommand
{
private readonly ISubject<T> _executeRequests = new Subject<T>();
private readonly ISubject<T> _canExecuteRequests = new Subject<T>();
public IObservable<bool> CanExecuteChanges { get; private set; }
public IObservable<T> CanExecuteRequests { get; private set; }
public IObservable<T> ExecuteRequests { get; private set; }
public ObservableCommand(IObservable<bool> canExecute)
{
var source = canExecute.DistinctUntilChanged()
//How do I dispose of subscription later?
//I have this fear that I'm going to have a chain of references,
//and my entire app will never get GC'd!
var subscription = source.Subscribe(
o => {
if (CanExecuteChanged != null)
CanExecuteChanged(this, EventArgs.Empty);
});
CanExecuteChanges = source;
CanExecuteRequests = _canExecuteRequests.AsObservable();
ExecuteRequests = _executeRequests.AsObservable();
}
#region ICommand Members
public bool CanExecute(object parameter)
{
_canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
}
public event EventHandler CanExecuteChanged;
public void Execute(object parameter)
{
_executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
}
#endregion
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
是否不在构造函数中执行或发布,而是在订阅方法中执行或发布怎么样?
应该说,显式实现 IObservable是一种 Rx 反模式。
您可以使用
Defer
和Create
使订阅依赖于其他订阅者,例如How about just not
Do
ing orPublish
ing in the constructor, but rather in theSubscribe
method?It should be said, explicitly implementing
IObservable<T>
is something of an Rx anti-pattern.You can make Subscriptions dependent on other subscribers with
Defer
andCreate
, something like我已经给你准备了一张截图。
MyClass
实现了IObservable
并且还具有IObserver
的方法,但它们都是私有的。通过附加的OnInitialize
和OnSubscribe
,您应该能够对您想要响应的任何事件执行任何操作。如果您想让这个片段可重用,您可以将所有方法定义为
partial
,因为它们都返回void
。然后你可以创建任何你想要的定义。I've prepared a snipped for you.
MyClass
implementsIObservable<T>
and has also methods ofIObserver<T>
but they are all private. With additionalOnInitialize
andOnSubscribe
you should be able to do whatever you want on any event you want to response to.If you want to make this snipped reusable you could define all methods as
partial
as they all returnvoid
. Then you could create definition to whatever you want.