IObservable - 如何发送/发布/推送新值到集合

发布于 2024-10-20 03:05:47 字数 838 浏览 4 评论 0原文

我想从我的服务层公开 IObservable。

为了简单起见,我们假设服务层在内部从远程服务器(通过套接字)获取消息,并且套接字库需要一个 IMessageReponse 对象,该对象具有要传递给它的 MessageReceived 方法。

在内部,服务层创建一个 MessageResponse 对象,并在消息到达时通过 Action 回调获得通知。

鉴于这种设计,我需要能够将新消息推送到 IObservable,但在我见过的任何示例中,Observable.XYZ 似乎不支持简单的发送/发布/推送方法...

我如何接线我的 Observable.XYZ 在这种情况下???

我想要这样的东西...注意我知道这是 IObservable 的一个非常基本的实现,但我没想到我需要自己编写这段代码...我本以为会有一些东西适合我开箱即用。

public class PushObservable<T> : IObservable<T>
{
    private IList<IObserver<T>> _listeners = new List<IObserver<T>>();

    public void Send(T value)
    {
        foreach (var listener in _listeners) 
            listener.OnNext(value); 
    }

    public IDisposable Subscribe(IObserver<T> observer)
    { 
        _listeners.Add(observer);
    }
}

I want to expose an IObservable from my service layer.

For simplicity lets say that internally the service layer is getting Message from a remote server (via a socket) and that the socket library requires an object of IMessageReponse that has a MessageReceived method to be passed to it.

Internally the service layer creates a MessageResponse object and get notified by a Action callback when a message arrives.

Given this design I need to be able to push new messages to the IObservable but in any of the examples I've seen, Observable.XYZ doesn't seem to support a simple Send/Publish/Push method...

How do I wireup my Observable.XYZ in this scenario???

I want something like this... note I know this is a very basic implementation of IObservable, but I wouldn't have thought I would need to write this code myself... I would have thought that something would have been there for me out of the box.

public class PushObservable<T> : IObservable<T>
{
    private IList<IObserver<T>> _listeners = new List<IObserver<T>>();

    public void Send(T value)
    {
        foreach (var listener in _listeners) 
            listener.OnNext(value); 
    }

    public IDisposable Subscribe(IObserver<T> observer)
    { 
        _listeners.Add(observer);
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

肥爪爪 2024-10-27 03:05:47

您重写了一个已经存在的对象!您的“PushObservable”实际上是 Subject,它是 Rx 中的基本对象之一。

如果您确实想以 Rx 方式考虑这个问题,您可能会从来自套接字的 IObservable开始,然后您会将其选择为 IObservable,因为最终,您响应的事件是来自网络的字节。

You have rewritten an object that already exists! Your "PushObservable" is actually Subject<T>, and it's one of the fundamental objects in Rx.

If you really want to think about this problem in an Rx way, you'd probably start with an IObservable<byte[]> that comes from the socket, then you would Select this into an IObservable<IMessageResponse>, since at the end of the day, the event you're responding to is bytes coming off the wire.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文