反应式扩展清理

发布于 2024-10-08 20:44:58 字数 433 浏览 4 评论 0原文

如果您有使用 rx 的长调用链,例如:

var responses = collectionOfHttpRequests.ToObservable()
.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)
.Select(res => res.GetResponseBodyString()) // Extension method to get the body of the request
.Subscribe();

然后在操作完成之前调用 dispose,http 请求是否会被取消、关闭和正确处置,还是我必须以某种方式从方法链中选择 httprequests并单独处理它们?

我有一件事可以同时发生多个 http 请求,并且我需要能够取消(而不是忽略)其中一些/全部请求以节省网络流量。

If you have a long chain of calls using rx such as:

var responses = collectionOfHttpRequests.ToObservable()
.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)
.Select(res => res.GetResponseBodyString()) // Extension method to get the body of the request
.Subscribe();

and then before the operation completes you call a dispose, will the http requests be cancelled, closed, and disposed of properly or do I have to somehow select the httprequests from the method chains and dispose of them individually?

I have a thing where one can have several http requests occurring at once and I need to be able to cancel (not ignore) some/all of them to save network traffic.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

临走之时 2024-10-15 20:44:58

当序列完成、错误或订阅被处理时,Rx 操作符链将自行清理。然而,每个操作员只会清理他们期望清理的内容。例如,FromEvent 将取消订阅事件。

就您而言,Begin/End
异步模式
,因此 Rx 无需取消任何内容。但是,您可以使用 Finally 调用 HttpWebRequest.Abort

var observableRequests = collectionOfHttpRequests.ToObservable();

var responses = observableRequests
    .SelectMany(req => 
        Observable.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)()
    )
    .Select(resp => resp.GetResponseBodyString())
    .Finally(() =>
    {
        observableRequests
            .Subscribe(req => req.Abort());
    })
    .Subscribe();

The Rx operator chain will clean itself up when the sequence completes, errors or the subscription is disposed. However, each operator will only cleanup what they are expected to cleanup. For example, FromEvent will unsubscribe from the event.

In your case, cancellation is not supported by the Begin/End
asynchronous pattern
, so there is nothing for Rx to cancel. You can, however, use Finally to call HttpWebRequest.Abort.

var observableRequests = collectionOfHttpRequests.ToObservable();

var responses = observableRequests
    .SelectMany(req => 
        Observable.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)()
    )
    .Select(resp => resp.GetResponseBodyString())
    .Finally(() =>
    {
        observableRequests
            .Subscribe(req => req.Abort());
    })
    .Subscribe();
听风念你 2024-10-15 20:44:58

我不能承认理查德·萨莱的解决方案是可以接受的。如果您启动 100 个请求,而第二个请求因服务器不可用错误而失败(例如),则其余 98 个请求将被中止。第二个问题是 UI 将如何对这样的 observable 做出反应?太悲伤了。

请记住 Rx 设计指南的第 4.3 章 我希望通过 Observable.Using() 运算符表达 WebRequest observable。但WebRequest不是一次性的!所以我定义了 DisposableWebRequest:

public class DisposableWebRequest : WebRequest, IDisposable
{
    private static int _Counter = 0;

    private readonly WebRequest _request;
    private readonly int _index;

    private volatile bool _disposed = false;

    public DisposableWebRequest (string url)
    {
        this._request = WebRequest.Create(url);
        this._index = ++DisposableWebRequest._Counter;
    }

    public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state)
    {
        return this._request.BeginGetResponse(callback, state);
    }

    public override WebResponse EndGetResponse(IAsyncResult asyncResult)
    {
        Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId));
        Trace.Flush();
        if (!this._disposed)
        {
            return this._request.EndGetResponse(asyncResult);
        }
        else
        {
            return null;
        }
    }

    public override WebResponse GetResponse()
    {
        return this._request.GetResponse();
    }

    public override void Abort()
    {
        this._request.Abort();
    }

    public void Dispose()
    {
        if(!this._disposed)
        {
            this._disposed = true;

            Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId ));
            Trace.Flush();
            this.Abort();
        }
    }
}

然后我创建 WPF 窗口并在其上放置两个按钮(开始和停止)。

因此,让我们创建适当的请求可观察集合。
首先,定义 URL 的可观察创建函数:

        Func<IObservable<string>> createUrlObservable = () =>
            Observable
                .Return("http://yahoo.com")
                .Repeat(100)
                .OnStartup(() =>
                {
                    this._failed = 0;
                    this._successed = 0;
                });

在每个 url 上,我们应该创建 webrequest 可观察,因此:

        Func<string, IObservable<WebResponse>> createRequestObservable = 
            url => 
            Observable.Using(
                () => new DisposableWebRequest("http://yahoo.com"),
                r =>
                {
                    Trace.WriteLine("Queued " + url);
                    Trace.Flush();
                    return Observable
                        .FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)();
                });

另外定义两个对按钮“开始”/“停止”点击做出反应的事件可观察:

        var startMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StartButton, "Click");
        var stopMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StopButton, "Click");

所以砖块已准备好,是时候组合它们了(我在 InitializeComponent() 之后的视图构造函数中执行此操作:

        startMouseDown
            .SelectMany(down =>
                createUrlObservable()
                    .SelectMany(url => createRequestObservable(url)
                        .TakeUntil(stopMouseDown)
                        .Select(r => r.GetResponseStream())
                        .Do(s =>
                            {
                                using (var sr = new StreamReader(s))
                                {
                                    Trace.WriteLine(sr.ReadLine());
                                    Trace.Flush();
                                }

                            })
                        .Do(r => this._successed++)
                        .HandleError(e => this._failed++))
                        .ObserveOnDispatcher()
                        .Do(_ => this.RefresLabels(),
                            e => { },
                            () => this.RefresLabels())

                        )
            .Subscribe();

您可能想知道函数“HandleError()”。如果 EndGetResponse() 调用中发生异常(我关闭了网络连接来重现它)并且没有在 request observable 中捕获 - 它将导致 startMouseDown observable 崩溃。 HandleError 以静默方式捕获异常,执行提供的操作,并且不会为下一个观察者调用 OnError,而是调用 OnCompleted:

public static class ObservableExtensions
{
    public static IObservable<TSource> HandleError<TSource>(this IObservable<TSource> source, Action<Exception> errorHandler)
    {
        return Observable.CreateWithDisposable<TSource>(observer =>
            {
                return source.Subscribe(observer.OnNext, 
                    e => 
                    { 
                        errorHandler(e);
                        //observer.OnError(e);
                        observer.OnCompleted();
                    },
                    observer.OnCompleted);
            });
    }
}

最后一个未解释的地方是方法 RefreshLabels,它更新 UI 控件:

    private void RefresLabels()
    {
        this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed);
        this.FailedLabel.Content = string.Format("Failed {0}", this._failed);
    }

I can't admit Richard Szalay's solution as acceptable. If you start 100 requests and second request failed with server unavailable error (for example) remaining 98 requests will be aborted. The second problem is how UI will react on such observable? Too sad.

Keeping in mind chapter 4.3 of Rx Design Guidelines I desired to express WebRequest observable via Observable.Using() operator. But WebRequest is not disposable! So I defined DisposableWebRequest:

public class DisposableWebRequest : WebRequest, IDisposable
{
    private static int _Counter = 0;

    private readonly WebRequest _request;
    private readonly int _index;

    private volatile bool _disposed = false;

    public DisposableWebRequest (string url)
    {
        this._request = WebRequest.Create(url);
        this._index = ++DisposableWebRequest._Counter;
    }

    public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state)
    {
        return this._request.BeginGetResponse(callback, state);
    }

    public override WebResponse EndGetResponse(IAsyncResult asyncResult)
    {
        Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId));
        Trace.Flush();
        if (!this._disposed)
        {
            return this._request.EndGetResponse(asyncResult);
        }
        else
        {
            return null;
        }
    }

    public override WebResponse GetResponse()
    {
        return this._request.GetResponse();
    }

    public override void Abort()
    {
        this._request.Abort();
    }

    public void Dispose()
    {
        if(!this._disposed)
        {
            this._disposed = true;

            Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId ));
            Trace.Flush();
            this.Abort();
        }
    }
}

Then I create WPF window and put two buttons on it (Start and Stop).

So, let's create proper requests observable collection.
At first, define URL's observable create function:

        Func<IObservable<string>> createUrlObservable = () =>
            Observable
                .Return("http://yahoo.com")
                .Repeat(100)
                .OnStartup(() =>
                {
                    this._failed = 0;
                    this._successed = 0;
                });

On every url we should create webrequest obervable, so:

        Func<string, IObservable<WebResponse>> createRequestObservable = 
            url => 
            Observable.Using(
                () => new DisposableWebRequest("http://yahoo.com"),
                r =>
                {
                    Trace.WriteLine("Queued " + url);
                    Trace.Flush();
                    return Observable
                        .FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)();
                });

In addition define two event observables which reacts on buttons "Start"/"Stop" clicks:

        var startMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StartButton, "Click");
        var stopMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StopButton, "Click");

So bricks are ready, time to compose them (I do it in view constructor just after InitializeComponent()):

        startMouseDown
            .SelectMany(down =>
                createUrlObservable()
                    .SelectMany(url => createRequestObservable(url)
                        .TakeUntil(stopMouseDown)
                        .Select(r => r.GetResponseStream())
                        .Do(s =>
                            {
                                using (var sr = new StreamReader(s))
                                {
                                    Trace.WriteLine(sr.ReadLine());
                                    Trace.Flush();
                                }

                            })
                        .Do(r => this._successed++)
                        .HandleError(e => this._failed++))
                        .ObserveOnDispatcher()
                        .Do(_ => this.RefresLabels(),
                            e => { },
                            () => this.RefresLabels())

                        )
            .Subscribe();

You may wonder on function "HandleError()". If exception occured in EndGetResponse() call (I turned off network connection to reproduce it) and not catched in request observable - it will crash the startMouseDown observable. HandleError catches exception silently, perfom provided action and instead of call OnError for next observer it calls OnCompleted:

public static class ObservableExtensions
{
    public static IObservable<TSource> HandleError<TSource>(this IObservable<TSource> source, Action<Exception> errorHandler)
    {
        return Observable.CreateWithDisposable<TSource>(observer =>
            {
                return source.Subscribe(observer.OnNext, 
                    e => 
                    { 
                        errorHandler(e);
                        //observer.OnError(e);
                        observer.OnCompleted();
                    },
                    observer.OnCompleted);
            });
    }
}

The last unexplained place is method RefreshLabels, which updates UI controls:

    private void RefresLabels()
    {
        this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed);
        this.FailedLabel.Content = string.Format("Failed {0}", this._failed);
    }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文