如何不断重试反应式方法直到成功?

发布于 2024-12-10 06:42:24 字数 2984 浏览 0 评论 0原文

这是我的 WebClient 异步下载反应式扩展。 一次又一次调用“DownloadStringAsync”直到操作成功的最佳方法是什么?

像这样但以反应方式:

while (true)
{
  var result = DownloadStringAsync();
  if (result)
  {
    return;
  }
}

我的代码:

[Serializable]
public class WebClientException : Exception
{
    public WebClientResponse Response { get; set; }

    public WebClientException()
    {
    }

    public WebClientException(string message)
        : base(message)
    {
    }

    public WebClientException(string message, Exception innerException)
        : base(message, innerException)
    {
    }

    protected WebClientException(SerializationInfo info, StreamingContext context)
        : base(info, context)
    {
    }
}

public class WebClientResponse
{
    public WebHeaderCollection Headers { get; set; }
    public HttpStatusCode StatusCode { get; set; }
    public string Result { get; set; }
    public WebException Exception { get; set; }
}

public static IObservable<WebClientResponse> DownloadStringAsync(this WebClient webClient, Uri address, WebHeaderCollection requestHeaders)
{
    var asyncResult =
        Observable.FromEventPattern<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>
            (ev => webClient.DownloadStringCompleted += ev, ev => webClient.DownloadStringCompleted -= ev)
            .ObserveOn(Scheduler.TaskPool)
            .Select(o =>
                        {
                            var ex = o.EventArgs.Error as WebException;

                            if (ex == null)
                            {
                                var wc = (WebClient) o.Sender;

                                return new WebClientResponse {Headers = wc.ResponseHeaders, Result = o.EventArgs.Result};
                            }

                            var wcr = new WebClientResponse {Exception = ex};

                            var r = ex.Response as HttpWebResponse;
                            if (r != null)
                            {
                                wcr.Headers = r.Headers;
                                wcr.StatusCode = r.StatusCode;

                                var s = r.GetResponseStream();
                                if (s != null)
                                {
                                    using (TextReader tr = new StreamReader(s))
                                    {
                                        wcr.Result = tr.ReadToEnd();
                                    }
                                }
                            }

                            throw new WebClientException {Response = wcr};
                        })
            .Take(1);

    if (requestHeaders != null)
    {
        foreach (var key in requestHeaders.AllKeys)
        {
            webClient.Headers.Add(key, requestHeaders[key]);
        }
    }

    webClient.DownloadStringAsync(address);

    return asyncResult;
}

Here is my async download reactive extension for WebClient.
What is the best way to recall "DownloadStringAsync" again and again till the operation succeeds?

Something like this but in reactive way:

while (true)
{
  var result = DownloadStringAsync();
  if (result)
  {
    return;
  }
}

MY CODE:

[Serializable]
public class WebClientException : Exception
{
    public WebClientResponse Response { get; set; }

    public WebClientException()
    {
    }

    public WebClientException(string message)
        : base(message)
    {
    }

    public WebClientException(string message, Exception innerException)
        : base(message, innerException)
    {
    }

    protected WebClientException(SerializationInfo info, StreamingContext context)
        : base(info, context)
    {
    }
}

public class WebClientResponse
{
    public WebHeaderCollection Headers { get; set; }
    public HttpStatusCode StatusCode { get; set; }
    public string Result { get; set; }
    public WebException Exception { get; set; }
}

public static IObservable<WebClientResponse> DownloadStringAsync(this WebClient webClient, Uri address, WebHeaderCollection requestHeaders)
{
    var asyncResult =
        Observable.FromEventPattern<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>
            (ev => webClient.DownloadStringCompleted += ev, ev => webClient.DownloadStringCompleted -= ev)
            .ObserveOn(Scheduler.TaskPool)
            .Select(o =>
                        {
                            var ex = o.EventArgs.Error as WebException;

                            if (ex == null)
                            {
                                var wc = (WebClient) o.Sender;

                                return new WebClientResponse {Headers = wc.ResponseHeaders, Result = o.EventArgs.Result};
                            }

                            var wcr = new WebClientResponse {Exception = ex};

                            var r = ex.Response as HttpWebResponse;
                            if (r != null)
                            {
                                wcr.Headers = r.Headers;
                                wcr.StatusCode = r.StatusCode;

                                var s = r.GetResponseStream();
                                if (s != null)
                                {
                                    using (TextReader tr = new StreamReader(s))
                                    {
                                        wcr.Result = tr.ReadToEnd();
                                    }
                                }
                            }

                            throw new WebClientException {Response = wcr};
                        })
            .Take(1);

    if (requestHeaders != null)
    {
        foreach (var key in requestHeaders.AllKeys)
        {
            webClient.Headers.Add(key, requestHeaders[key]);
        }
    }

    webClient.DownloadStringAsync(address);

    return asyncResult;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

玩套路吗 2024-12-17 06:42:24

您的方法会生成一个热可观察对象,这意味着它在返回时已经开始加载,并且每个新订阅都不会向 Web 服务器创建新请求。您需要将您的方法包装在另一个方法中并使用 Observable.Create (为了创建一个冷 observable,它会在每个订阅上创建一个新请求):

public static IObservable<WebClientResponse> DownloadStringAsync(this WebClient webClient, Uri address, WebHeaderCollection requestHeaders)
{
    return Observable
        .Create(observer => 
        {
            DownloadStringAsyncImpl(webClient, address, requestHeaders)
                .Subscribe(observer);
            return () => { webClient.CancelAsync(); };
        });
}

这里,DownloadStringAsyncImpl 是您以前的 DownloadStringAsync 实现,而公共方法已被替换。

现在您可以重试异步方法,直到成功,如下所示:

myWebClient
    .DownloadStringAsync( /* args... */)
    .Retry()
    .Subscribe(result => { 
         /* now I've got a result! */
    });

Your method produces a hot observable, which means that it has already started loading when it returns and each new subscription does not create a new request to the web server. You need to wrap your method in another and use Observable.Create (in order to create a cold observable which does create a new request upon each subscription):

public static IObservable<WebClientResponse> DownloadStringAsync(this WebClient webClient, Uri address, WebHeaderCollection requestHeaders)
{
    return Observable
        .Create(observer => 
        {
            DownloadStringAsyncImpl(webClient, address, requestHeaders)
                .Subscribe(observer);
            return () => { webClient.CancelAsync(); };
        });
}

Here, DownloadStringAsyncImpl is your previous implementation of DownloadStringAsync, while the public method has been replaced.

Now you can retry the async method until it succeeds as follows:

myWebClient
    .DownloadStringAsync( /* args... */)
    .Retry()
    .Subscribe(result => { 
         /* now I've got a result! */
    });
霓裳挽歌倾城醉 2024-12-17 06:42:24

我认为你至少有一个像样的“这里是一些代码”答案,所以我将重点关注更一般的手持方式。

我首先要看的是Rx 设计指南。这是一个简短的(34 页)PDF 文档,有助于将范式从拉动“订阅”转变为推送,或从 IEnumerable 转变为 IObservable。

如果您想更进一步,可以使用 .NET 的 PDF HOL(实践实验室) JavaScript。您可以在 Rx 页面上找到其他资源(从这里开始)。

I think you have at least one decent "here is some code" answer, so I will focus on a more general hand holding.

The first thing I would look at is the design guidelines for Rx. It is a short (34 page) PDF document that helps change paradigm from pull "subscriptions" to push, or moving from IEnumerable to IObservable.

If you want to go a bit further, there are PDF HOLs (hands on labs) for both .NET and JavaScript. You can find other resources on the Rx pages (start here).

水波映月 2024-12-17 06:42:24

如果是异步函数。进行重复检查意味着您将其变成了同步函数调用。这是你真正想做的事情吗?

您可以让一个专用线程调用此异步函数,并在调用此函数后阻止自身。创建此线程时,向其传递一个委托,该委托应在异步函数返回后调用。完成后,使用错误代码调用委托。

希望这能回答您的问题。

If it is an async function. Doing a repetitive checking means you turned it into a sync function call. Is this something you really want to do?

You can have a dedicated thread calling this async function and block itself after calling this function. When create this thread, pass it a delegate that should be called after the async function returns. Upon completion, call the delegate with error code.

Hope this answers your question.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文