IObservable - 如何发送/发布/推送新值到集合
我想从我的服务层公开 IObservable。
为了简单起见,我们假设服务层在内部从远程服务器(通过套接字)获取消息,并且套接字库需要一个 IMessageReponse 对象,该对象具有要传递给它的 MessageReceived 方法。
在内部,服务层创建一个 MessageResponse 对象,并在消息到达时通过 Action 回调获得通知。
鉴于这种设计,我需要能够将新消息推送到 IObservable,但在我见过的任何示例中,Observable.XYZ 似乎不支持简单的发送/发布/推送方法...
我如何接线我的 Observable.XYZ 在这种情况下???
我想要这样的东西...注意我知道这是 IObservable 的一个非常基本的实现,但我没想到我需要自己编写这段代码...我本以为会有一些东西适合我开箱即用。
public class PushObservable<T> : IObservable<T>
{
private IList<IObserver<T>> _listeners = new List<IObserver<T>>();
public void Send(T value)
{
foreach (var listener in _listeners)
listener.OnNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
_listeners.Add(observer);
}
}
I want to expose an IObservable from my service layer.
For simplicity lets say that internally the service layer is getting Message from a remote server (via a socket) and that the socket library requires an object of IMessageReponse that has a MessageReceived method to be passed to it.
Internally the service layer creates a MessageResponse object and get notified by a Action callback when a message arrives.
Given this design I need to be able to push new messages to the IObservable but in any of the examples I've seen, Observable.XYZ doesn't seem to support a simple Send/Publish/Push method...
How do I wireup my Observable.XYZ in this scenario???
I want something like this... note I know this is a very basic implementation of IObservable, but I wouldn't have thought I would need to write this code myself... I would have thought that something would have been there for me out of the box.
public class PushObservable<T> : IObservable<T>
{
private IList<IObserver<T>> _listeners = new List<IObserver<T>>();
public void Send(T value)
{
foreach (var listener in _listeners)
listener.OnNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
_listeners.Add(observer);
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您重写了一个已经存在的对象!您的“PushObservable”实际上是
Subject
,它是 Rx 中的基本对象之一。如果您确实想以 Rx 方式考虑这个问题,您可能会从来自套接字的 IObservable开始,然后您会将其选择为
IObservable
,因为最终,您响应的事件是来自网络的字节。You have rewritten an object that already exists! Your "PushObservable" is actually
Subject<T>
, and it's one of the fundamental objects in Rx.If you really want to think about this problem in an Rx way, you'd probably start with an
IObservable<byte[]>
that comes from the socket, then you would Select this into anIObservable<IMessageResponse>
, since at the end of the day, the event you're responding to is bytes coming off the wire.