“节流” F# 中的异步下载

发布于 2024-11-13 05:39:05 字数 667 浏览 0 评论 0原文

我正在尝试下载我博客的 xml 备份中引用的 3000 多张照片。我遇到的问题是,如果其中一张照片不再可用,整个异步就会被阻止,因为 AsyncGetResponse 不会超时。

ildjarn 帮助我组合了一个 AsyncGetResponse 版本,该版本在超时时会失败,但使用它可以提供更多更多信息超时 - 就像刚刚排队的请求超时一样。似乎所有 WebRequest 都“立即”启动,使其工作的唯一方法是将超时设置为下载所有 WebRequest 所需的时间:这不太好,因为这意味着我根据图像数量调整了超时。

我是否已达到普通 async 的极限?我应该考虑反应式扩展吗?

这有点尴尬,因为我已经问了两个 关于这一点的问题代码,但我仍然没有按照我想要的方式工作!

I'm trying to download the 3000+ photos referenced from the xml backup of my blog. The problem I came across is that if just one of those photos is no longer available, the whole async gets blocked because AsyncGetResponse doesn't do timeouts.

ildjarn helped me to put together a version of AsyncGetResponse which does fail on timeout, but using that gives a lot more timeouts - as though requests that are just queued timeout. It seems like all the WebRequests are launched 'immediately', the only way to make it work is to set the timeout to the time required to download all of them combined: which isn't great because it means I have adjust the timeout depending on the number of images.

Have I reached the limits of vanilla async? Should I be looking at reactive extensions instead?

This is a bit embarassing, because I've already asked two questions here on this particular bit of code, and I still haven't got it working the way I want!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

夏夜暖风 2024-11-20 05:39:05

我认为肯定有比使用超时更好的方法来找出文件不可用。我不太确定,但是如果找不到文件,是否有某种方法可以使其抛出异常?然后,您可以将您的异步代码包装在 try .. with 中,您应该可以避免大多数问题。

无论如何,如果您想编写自己的“并发管理器”来并行运行一定数量的请求并对剩余的待处理请求进行排队,那么 F# 中最简单的选择是使用代理(MailboxProcessor 类型)。以下对象封装了该行为:

type ThrottlingAgentMessage = 
  | Completed
  | Work of Async<unit>

/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
type ThrottlingAgent(limit) = 
  let agent = MailboxProcessor.Start(fun agent -> 
    /// Represents a state when the agent is blocked
    let rec waiting () = 
      // Use 'Scan' to wait for completion of some work
      agent.Scan(function
        | Completed -> Some(working (limit - 1))
        | _ -> None)
    /// Represents a state when the agent is working
    and working count = async { 
      while true do
        // Receive any message 
        let! msg = agent.Receive()
        match msg with 
        | Completed -> 
            // Decrement the counter of work items
            return! working (count - 1)
        | Work work ->
            // Start the work item & continue in blocked/working state
            async { try do! work 
                    finally agent.Post(Completed) }
            |> Async.Start
            if count < limit then return! working (count + 1)
            else return! waiting () }
    working 0)      

  /// Queue the specified asynchronous workflow for processing
  member x.DoWork(work) = agent.Post(Work work)

I think there must be a better way to find out that a file is not available than using a timeout. I'm not exactly sure, but is there some way to make it throw an exception if a file cannot be found? Then you could just wrap your async code inside try .. with and you should avoid most of the problems.

Anyway, if you want to write your own "concurrency manager" that runs certain number of requests in parallel and queues remaining pending requests, then the easiest option in F# is to use agents (the MailboxProcessor type). The following object encapsulates the behavior:

type ThrottlingAgentMessage = 
  | Completed
  | Work of Async<unit>

/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
type ThrottlingAgent(limit) = 
  let agent = MailboxProcessor.Start(fun agent -> 
    /// Represents a state when the agent is blocked
    let rec waiting () = 
      // Use 'Scan' to wait for completion of some work
      agent.Scan(function
        | Completed -> Some(working (limit - 1))
        | _ -> None)
    /// Represents a state when the agent is working
    and working count = async { 
      while true do
        // Receive any message 
        let! msg = agent.Receive()
        match msg with 
        | Completed -> 
            // Decrement the counter of work items
            return! working (count - 1)
        | Work work ->
            // Start the work item & continue in blocked/working state
            async { try do! work 
                    finally agent.Post(Completed) }
            |> Async.Start
            if count < limit then return! working (count + 1)
            else return! waiting () }
    working 0)      

  /// Queue the specified asynchronous workflow for processing
  member x.DoWork(work) = agent.Post(Work work)
稚气少女 2024-11-20 05:39:05

没有什么是容易的。 :)

我认为您遇到的问题是问题域固有的(而不仅仅是异步编程模型的问题,尽管它们确实有所交互)。

假设您要下载 3000 张图片。首先,在你的.NET进程中,有一些类似System.Net.ConnectionLimit的东西或者我忘记了名字的东西,它会限制你的.NET进程可以同时运行的同时HTTP连接的数量(默认值只是'2 ' 我认为)。因此,您可以找到该控件并将其设置为更高的数字,这会有所帮助。

但接下来,您的机器和互联网连接的带宽是有限的。因此,即使您可以尝试同时启动 3000 个 HTTP 连接,每个单独的连接也会因带宽管道限制而变慢。所以这也会与超时相互作用。 (这甚至没有考虑服务器上有哪些类型的节流/限制。也许如果您发送 3000 个请求,它会认为您正在进行 DoS 攻击并将您的 IP 列入黑名单。)

因此,这确实是一个问题域,一个好的解决方案需要一些智能节流和流量控制,以管理底层系统资源的使用方式。

与另一个答案一样,F# 代理 (MailboxProcessors) 是用于编写此类限制/流量控制逻辑的良好编程模型。

(即使如此,如果大多数图片文件都在 1MB 左右,但其中混有一个 1GB 文件,则该单个文件可能会超时。)

无论如何,这并不是问题的答案,而只是指出问题域本身的内在复杂性有多少。 (也许这也说明了为什么 UI“下载管理器”如此受欢迎。)

Nothing is ever easy. :)

I think the issues you're hitting are intrinsic to the problem domain (as opposed to merely being issues with the async programming model, though they do interact somewhat).

Say you want to download 3000 pictures. First, in your .NET process, there is something like System.Net.ConnectionLimit or something I forget the name of, that will e.g. throttle the number of simultaneous HTTP connections your .NET process can run simultaneously (and the default is just '2' I think). So you could find that control and set it to a higher number, and it would help.

But then next, your machine and internet connection have finite bandwidth. So even if you could try to concurrently start 3000 HTTP connections, each individual connection would get slower based on the bandwidth pipe limitations. So this would also interact with timeouts. (And this doesn't even consider what kinds of throttles/limits are on the server. Maybe if you send 3000 requests it will think you are DoS attacking and blacklist your IP.)

So this is really a problem domain where a good solution requires some intelligent throttling and flow-control in order to manage how the underlying system resources are used.

As in the other answer, F# agents (MailboxProcessors) are a good programming model for authoring such throttling/flow-control logic.

(Even with all that, if most picture files are like 1MB but then there is a 1GB file mixed in there, that single file might trip a timeout.)

Anyway, this is not so much an answer to the question, as just pointing out how much intrinsic complexity there is in the problem domain itself. (Perhaps it's also suggestive of why UI 'download managers' are so popular.)

余厌 2024-11-20 05:39:05

这是托马斯答案的一个变体,因为我需要一个可以返回结果的代理。

type ThrottleMessage<'a> = 
    | AddJob of (Async<'a>*AsyncReplyChannel<'a>) 
    | DoneJob of ('a*AsyncReplyChannel<'a>) 
    | Stop

/// This agent accumulates 'jobs' but limits the number which run concurrently.
type ThrottleAgent<'a>(limit) =    
    let agent = MailboxProcessor<ThrottleMessage<'a>>.Start(fun inbox ->
        let rec loop(jobs, count) = async {
            let! msg = inbox.Receive()  //get next message
            match msg with
            | AddJob(job) -> 
                if count < limit then   //if not at limit, we work, else loop
                    return! work(job::jobs, count)
                else
                    return! loop(job::jobs, count)
            | DoneJob(result, reply) -> 
                reply.Reply(result)           //send back result to caller
                return! work(jobs, count - 1) //no need to check limit here
            | Stop -> return () }
        and work(jobs, count) = async {
            match jobs with
            | [] -> return! loop(jobs, count) //if no jobs left, wait for more
            | (job, reply)::jobs ->          //run job, post Done when finished
                async { let! result = job 
                        inbox.Post(DoneJob(result, reply)) }
                |> Async.Start
                return! loop(jobs, count + 1) //job started, go back to waiting
        }
        loop([], 0)
    )
    member m.AddJob(job) = agent.PostAndAsyncReply(fun rep-> AddJob(job, rep))
    member m.Stop() = agent.Post(Stop)

在我的特定情况下,我只需要将其用作“一次性”“地图”,因此我添加了一个静态函数:

    static member RunJobs limit jobs = 
        let agent = ThrottleAgent<'a>(limit)
        let res = jobs |> Seq.map (fun job -> agent.AddJob(job))
                       |> Async.Parallel
                       |> Async.RunSynchronously
        agent.Stop()
        res

似乎工作正常...

Here's a variation on Tomas's answer, because I needed an agent which could return results.

type ThrottleMessage<'a> = 
    | AddJob of (Async<'a>*AsyncReplyChannel<'a>) 
    | DoneJob of ('a*AsyncReplyChannel<'a>) 
    | Stop

/// This agent accumulates 'jobs' but limits the number which run concurrently.
type ThrottleAgent<'a>(limit) =    
    let agent = MailboxProcessor<ThrottleMessage<'a>>.Start(fun inbox ->
        let rec loop(jobs, count) = async {
            let! msg = inbox.Receive()  //get next message
            match msg with
            | AddJob(job) -> 
                if count < limit then   //if not at limit, we work, else loop
                    return! work(job::jobs, count)
                else
                    return! loop(job::jobs, count)
            | DoneJob(result, reply) -> 
                reply.Reply(result)           //send back result to caller
                return! work(jobs, count - 1) //no need to check limit here
            | Stop -> return () }
        and work(jobs, count) = async {
            match jobs with
            | [] -> return! loop(jobs, count) //if no jobs left, wait for more
            | (job, reply)::jobs ->          //run job, post Done when finished
                async { let! result = job 
                        inbox.Post(DoneJob(result, reply)) }
                |> Async.Start
                return! loop(jobs, count + 1) //job started, go back to waiting
        }
        loop([], 0)
    )
    member m.AddJob(job) = agent.PostAndAsyncReply(fun rep-> AddJob(job, rep))
    member m.Stop() = agent.Post(Stop)

In my particular case, I just need to use it as a 'one shot' 'map', so I added a static function:

    static member RunJobs limit jobs = 
        let agent = ThrottleAgent<'a>(limit)
        let res = jobs |> Seq.map (fun job -> agent.AddJob(job))
                       |> Async.Parallel
                       |> Async.RunSynchronously
        agent.Stop()
        res

It seems to work ok...

一世旳自豪 2024-11-20 05:39:05

这是一个开箱即用的解决方案:

FSharpx.Control提供了Async.ParallelWithThrottle函数。我不确定这是否是最好的实现 它使用SemaphoreSlim。但易用性非常好,而且由于我的应用程序不需要顶级性能,所以它对我来说已经足够好了。尽管它是一个库,如果有人知道如何让它变得更好,那么让库开箱即用,表现最佳总是一件好事,这样我们其他人就可以使用有效的代码来完成我们的工作!

Here's an out of the box solution:

FSharpx.Control offers an Async.ParallelWithThrottle function. I'm not sure if it is the best implementation as it uses SemaphoreSlim. But the ease of use is great and since my application doesn't need top performance it works well enough for me. Although since it is a library if someone knows how to make it better it is always a nice thing to make libraries top performers out of the box so the rest of us can just use the code that works and just get our work done!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文