使用iObservable实现异步方法

发布于 2024-10-29 15:25:23 字数 676 浏览 2 评论 0原文

使用返回 IObservable 的方法来实现标准 Begin/End 异步模式的替代方案是否合理?在下面的代码中,我使用 Rx 包装旧版 API,以为其提供更好的接口。

API 上的 Messages 属性是一个热门 IObservable。我的问题是,我认为如果我在返回过滤响应的 IObservable 之前收到响应消息(即 m.RequestId == requestId ),订阅者将看不到它并且不会知道操作已完成。

有没有办法正确地做到这一点?

    public IObservable<bool> DoAsyncRequest()
    {
        Observable.Defer(delegate
        {
            int requestId = GenerateRequestId();

            this.api.DoApiRequest(requestId);

            return this.api.Messages
                .Where(m => m.RequestId == requestId)
                .Take(1)
                .Timeout(DefaultTimeout);

        });
    }

Is it reasonable to use a method that returns IObservable to implement an alternative to the standard Begin/End asynchronous pattern? In the following code I'm wrapping a legacy API using Rx to provide a nicer interface to it.

The Messages property on the API is a hot IObservable. My problem is that I think if I get the response message (i.e. m.RequestId == requestId) before I return the IObservable that filters responses, the subscriber will not see it and will not know that the operation completed.

Is there a way to do this properly?

    public IObservable<bool> DoAsyncRequest()
    {
        Observable.Defer(delegate
        {
            int requestId = GenerateRequestId();

            this.api.DoApiRequest(requestId);

            return this.api.Messages
                .Where(m => m.RequestId == requestId)
                .Take(1)
                .Timeout(DefaultTimeout);

        });
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

川水往事 2024-11-05 15:25:23

首先,返回 IObservable 来实现异步方法绝对没问题。

其次,如果是 DoApiRequest 方法启动通过 Messages 发出值的进程,您可以先订阅消息,然后再使用 CreateWithDisposable 调用该方法>

public IObservable<bool> DoAsyncRequest()
{
    return Observable.CreateWithDisposable<bool>(observer =>
        {    
            var disposable = this.api.Messages
                .Where(m => m.RequestId == requestId)
                .Take(1)
                .Timeout(DefaultTimeout);
                .Subscribe(observer);

            int requestId = GenerateRequestId();

            this.api.DoApiRequest(requestId);

            return disposable;

        });
}

或者,由于您似乎可以访问 api 对象的任何类,因此您也可以更改 DoApiRequest 以返回冷可观察值,即可能废弃 Messages 属性,并在 DoApiRequest 方法上返回 IObservable。

注意

如果我正确地假设 DoApiRequest 方法确实启动了通过消息发出值的过程,那么如果多次调用此方法,您将得到奇怪的结果。一个调用可能会收到原本要为另一调用返回的消息。如果可以的话,我会亲自研究替代解决方案。

Firstly it's absolutely fine to return an IObservable to implement asynchronous methods.

Secondly, if it's the DoApiRequest method that kicks off the process that emits values through Messages, you could subscribe to messages first before calling the method using CreateWithDisposable

public IObservable<bool> DoAsyncRequest()
{
    return Observable.CreateWithDisposable<bool>(observer =>
        {    
            var disposable = this.api.Messages
                .Where(m => m.RequestId == requestId)
                .Take(1)
                .Timeout(DefaultTimeout);
                .Subscribe(observer);

            int requestId = GenerateRequestId();

            this.api.DoApiRequest(requestId);

            return disposable;

        });
}

Alternatively, as you seem to have access to whatever class the api object is, you could also change DoApiRequest to return a cold observable i.e. possibly scrap the Messages property and instead return the IObservable on the DoApiRequest method.

Note

If I am correct in assuming that DoApiRequest method does kick off the process that emits values through Messages, then you're going to get odd results if this method is called multiple times. One call could get messages that were intended to be returned for another call. I would personally look into the alternative solution if you are able to.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文