如何在 F# 中实现异步而不是并行

发布于 2024-10-04 17:01:10 字数 458 浏览 0 评论 0原文

(坚持使用异步获取许多网页的常见示例)

我如何异步分拆多个(数百个)网页请求,然后等待所有请求完成,然后再进行下一步? Async.AsParallel 一次处理几个请求,由 CPU 上的核心数量控制。抓取网页不是 CPU 密集型操作。我对 Async.AsParallel 的加速不满意,正在寻找替代方案。

我尝试将 Async.StartAsTask 和 Task[].WaitAll 之间的点联系起来。本能地,我编写了以下代码,但它无法编译。

let processItemsConcurrently (items : int seq) = 
  let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
  Tasks.Task.WaitAll(tasks) 

你会如何处理这个问题?

(Sticking to a common example with async fetch of many web pages)

How would I spin off multiple (hundreds) of web page requests asynchronously, and then wait for all requests to complete before going to the next step? Async.AsParallel processes a few requests at a time, controlled by number of cores on the CPU. Grabbing a web page is not a CPU-bound operation. Not satisfied with the speedup of Async.AsParallel, I am looking for alternatives.

I tried to connect the dots between Async.StartAsTask and Task[].WaitAll. Instinctively, I wrote the following code, but it does not compile.

let processItemsConcurrently (items : int seq) = 
  let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
  Tasks.Task.WaitAll(tasks) 

How would you approach this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

遗心遗梦遗幸福 2024-10-11 17:01:10

Async.Parallel 几乎肯定就在这里。不确定你对什么不满意; F# 异步的优势更多地在于异步计算,而不是任务并行的 CPU 绑定内容(后者更适合 Task 和 .NET 4.0 TPL)。这是一个完整的示例:

open System.Diagnostics 
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions 

let sites = [|
    "http://bing.com"
    "http://google.com"
    "http://cnn.com"
    "http://stackoverflow.com"
    "http://yahoo.com"
    "http://msdn.com"
    "http://microsoft.com"
    "http://apple.com"
    "http://nfl.com"
    "http://amazon.com"
    "http://ebay.com"
    "http://expedia.com"
    "http://twitter.com"
    "http://reddit.com"
    "http://hulu.com"
    "http://youtube.com"
    "http://wikipedia.org"
    "http://live.com"
    "http://msn.com"
    "http://wordpress.com"
    |]

let print s = 
    // careful, don't create a synchronization bottleneck by printing
    //printf "%s" s
    ()

let printSummary info fullTimeMs =
    Array.sortInPlaceBy (fun (i,_,_) -> i) info
//  for i, size, time in info do
//      printfn "%2d  %7d  %5d" i size time
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
    printfn "longest request took %dms" longest
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
    let seconds = float fullTimeMs / 1000.
    printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)

let FetchAllSync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url ->
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream,
                            System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let contents = reader.ReadToEnd()
        print "r"
        i, contents.Length, sw.ElapsedMilliseconds)
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

let FetchAllAsync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url -> async {
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use! resp = req.AsyncGetResponse()
        use stream = resp.GetResponseStream()
        use reader = new AsyncStreamReader(stream, // F# PowerPack
                           System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let! contents = reader.ReadToEnd()  // in F# PowerPack
        print "r"
        return i, contents.Length, sw.ElapsedMilliseconds })
                    |> Async.Parallel 
                    |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance
    let time1,r1 = FetchAllSync()
    printfn "Sync took %dms, result was %d" time1 r1
    let time2,r2 = FetchAllAsync()
    printfn "Async took %dms, result was %d  (speedup=%2.2f)" 
        time2 r2 (float time1/ float time2)
    printfn ""

在我的 4 核机器上,这始终提供近 4 倍的加速。

编辑

为了回复您的评论,我更新了代码。你是对的,我添加了更多站点,但没有看到预期的加速(仍然稳定在 4 倍左右)。我已经开始在上面添加一些调试输出,将继续调查以查看是否有其他东西限制了连接...

编辑

再次编辑了代码。好吧,我发现了可能的瓶颈。下面是 PowerPack 中 AsyncReadToEnd 的实现:

type System.IO.StreamReader with
   member s.AsyncReadToEnd () = 
       FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())

换句话说,它只是阻塞线程池线程并同步读取。啊啊!!!让我看看是否可以解决这个问题。

编辑

好的,PowerPack 中的 AsyncStreamReader 做了正确的事情,我现在正在使用它。

然而,关键问题似乎是方差。

当您点击 cnn.com 时,很多时候结果会在 500 毫秒内返回。但每隔一段时间,您就会收到一个需要 4 秒的请求,这当然可能会杀死明显的异步性能,因为总时间是最不幸的请求的时间。

运行上面的程序,我在家里的 2 核机器上看到速度从大约 2.5 倍到 9 倍。不过,它的变化非常大。程序中仍然可能存在一些我错过的瓶颈,但我认为网络的方差可能解释了我此时所看到的所有内容。

Async.Parallel is almost definitely right here. Not sure what you're not happy with; the strength of F# asyncs lies more in async computing than in task-parallel CPU-bound stuff (which is more tailored to Tasks and the .NET 4.0 TPL). Here's a full example:

open System.Diagnostics 
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions 

let sites = [|
    "http://bing.com"
    "http://google.com"
    "http://cnn.com"
    "http://stackoverflow.com"
    "http://yahoo.com"
    "http://msdn.com"
    "http://microsoft.com"
    "http://apple.com"
    "http://nfl.com"
    "http://amazon.com"
    "http://ebay.com"
    "http://expedia.com"
    "http://twitter.com"
    "http://reddit.com"
    "http://hulu.com"
    "http://youtube.com"
    "http://wikipedia.org"
    "http://live.com"
    "http://msn.com"
    "http://wordpress.com"
    |]

let print s = 
    // careful, don't create a synchronization bottleneck by printing
    //printf "%s" s
    ()

let printSummary info fullTimeMs =
    Array.sortInPlaceBy (fun (i,_,_) -> i) info
//  for i, size, time in info do
//      printfn "%2d  %7d  %5d" i size time
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
    printfn "longest request took %dms" longest
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
    let seconds = float fullTimeMs / 1000.
    printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)

let FetchAllSync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url ->
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream,
                            System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let contents = reader.ReadToEnd()
        print "r"
        i, contents.Length, sw.ElapsedMilliseconds)
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

let FetchAllAsync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url -> async {
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use! resp = req.AsyncGetResponse()
        use stream = resp.GetResponseStream()
        use reader = new AsyncStreamReader(stream, // F# PowerPack
                           System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let! contents = reader.ReadToEnd()  // in F# PowerPack
        print "r"
        return i, contents.Length, sw.ElapsedMilliseconds })
                    |> Async.Parallel 
                    |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance
    let time1,r1 = FetchAllSync()
    printfn "Sync took %dms, result was %d" time1 r1
    let time2,r2 = FetchAllAsync()
    printfn "Async took %dms, result was %d  (speedup=%2.2f)" 
        time2 r2 (float time1/ float time2)
    printfn ""

On my 4-core box, this consistently gives a nearly 4x speedup.

EDIT

In reply to your comment, I've updated the code. You're right in that I've added more sites and am not seeing the expected speedup (still holding steady around 4x). I've started adding a little debugging output above, will continue investigating to see if something else is throttling the connections...

EDIT

Editted the code again. Well, I found what might be the bottleneck. Here's the implementation of AsyncReadToEnd in the PowerPack:

type System.IO.StreamReader with
   member s.AsyncReadToEnd () = 
       FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())

In other words, it just blocks a threadpool thread and reads synchronously. Argh!!! Let me see if I can work around that.

EDIT

Ok, the AsyncStreamReader in the PowerPack does the right thing, and I'm using that now.

However, the key issue seems to be variance.

When you hit, say, cnn.com, a lot of the time the result will come back in like 500ms. But every once in a while you get that one request that takes 4s, and this of course potentially kills the apparent async perf, since the overall time is the time of the unluckiest request.

Running the program above, I see speedups from about 2.5x to 9x on my 2-core box at home. It is very highly variable, though. It's still possible there's some bottleneck in the program that I've missed, but I think the variance-of-the-web may account for all of what I'm seeing at this point.

樱娆 2024-10-11 17:01:10

使用 .NET 的响应式扩展与 F# 相结合,您可以编写一个非常优雅的解决方案 - 查看示例 http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work- for-you-reactive-style/ (这里使用C#,但使用F#也很容易;关键是使用Begin/End方法而不是sync方法,即使你可以让它编译,它也会不必要地阻塞 n 个 ThreadPool 线程,而不是 Threadpool 只是在完成例程进入时拾取它们)

Using the Reactive Extensions for .NET combined with F#, you can write a very elegant solution - check out the sample at http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work-for-you-reactive-style/ (this uses C#, but using F# is easy too; the key is using the Begin/End methods instead of the sync method, which even if you can make it compile, it will block up n ThreadPool threads unnecessarily, instead of the Threadpool just picking up completion routines as they come in)

遮云壑 2024-10-11 17:01:10

我敢打赌,您所经历的加速对于您的口味来说不够显着,因为您要么使用 WebRequest 的子类型,要么使用依赖它的类(例如 WebClient)。
如果是这种情况,您需要设置 MaxConnection ConnectionManagementElement 上的 (我建议您仅在需要时才将其设置,否则它将成为一个非常耗时的操作)为一个较高的值,具体取决于您想要从应用程序启动的同时连接的数量。

My bet is that the speedup you're experiencing is not significant enough for your taste because you're either using a subtype of WebRequest or a class relying on it (such as WebClient).
If that's the case, you need to set the MaxConnection on the ConnectionManagementElement (and I suggest you only set it if needed otherwise it's gonna become a pretty time-consuming operation) to a high value, depending on the number of simultaneous connections you wanna initiate from your application.

爱已欠费 2024-10-11 17:01:10

我不是 F# 人员,但从纯粹的 .NET 角度来看,您正在寻找的是 TaskFactory::FromAsync,其中您将包装在任务中的异步调用类似于 HttpRequest::BeginGetResponse。您还可以使用 TaskCompletionSource 封装 WebClient 公开的 EAP 模型。有关这两个主题的更多信息,请参阅 MSDN 上的此处

希望有了这些知识,您可以找到最接近的原生 F# 方法来完成您想要做的事情。

I'm not an F# guy, but from a pure .NET perspective what you're looking for is TaskFactory::FromAsync where the asynchronous call you'd be wrapping in a Task would be something like HttpRequest::BeginGetResponse. You could also wrap up the EAP model that WebClient exposes using a TaskCompletionSource. More on both of these topics here on MSDN.

Hopefully with this knowledge you can find the nearest native F# approach to accomplish what you're trying to do.

冰火雁神 2024-10-11 17:01:10

下面是一些避免未知因素的代码,例如网络访问延迟。我的 CPU 利用率低于 5%,同步和异步代码路径的效率约为 60-80%。

open System.Diagnostics

let numWorkers = 200
let asyncDelay = 50

let main =
   let codeBlocks = [for i in 1..numWorkers -> 
                        async { do! Async.Sleep asyncDelay } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main

Here's some code that avoids the unknowns, such as web access latency. I am getting under 5% CPU utilization, and about 60-80% efficiency for both sync and async code paths.

open System.Diagnostics

let numWorkers = 200
let asyncDelay = 50

let main =
   let codeBlocks = [for i in 1..numWorkers -> 
                        async { do! Async.Sleep asyncDelay } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文