昂贵的异步读取响应流

发布于 2024-12-12 05:45:04 字数 2903 浏览 0 评论 0原文

在过去的几天里我一直在尝试学习 F#,但我总是遇到一些让我困惑的事情。我的“学习项目”是一个屏幕抓取工具,用于处理我感兴趣的一些数据。

在 F# PowerPack 中,有一个调用 Stream.AsyncReadToEnd。我不想仅在一次调用中使用 PowerPack,因此我了解了他们是如何做到的。

module Downloader =
    open System
    open System.IO
    open System.Net
    open System.Collections

    type public BulkDownload(uriList : IEnumerable) =
        member this.UriList with get() = uriList

        member this.ParalellDownload() =
            let Download (uri : Uri) = async {
                let UnblockViaNewThread f = async {
                    do! Async.SwitchToNewThread()
                    let res = f()
                    do! Async.SwitchToThreadPool()
                    return res }

                let request = HttpWebRequest.Create(uri)
                let! response = request.AsyncGetResponse()
                use responseStream = response.GetResponseStream()
                use reader = new StreamReader(responseStream)
                let! contents = UnblockViaNewThread (fun() -> reader.ReadToEnd())
                return uri, contents.ToString().Length }

            this.UriList
            |> Seq.cast
            |> Seq.map Download
            |> Async.Parallel
            |> Async.RunSynchronously

他们有 UnblockViaNewThread 函数。这真的是异步读取响应流的唯一方法吗?创建一个新线程不是很昂贵吗(我已经看到“~1mb 内存”到处乱扔)。有更好的方法吗?这是每次 Async* 调用(我可以 let!)中真正发生的情况吗?

编辑:我遵循 Tomas 的建议,实际上想出了一些独立于 F# PowerTools 的东西。这里是。这确实需要错误处理,但它异步请求并将 url 下载到字节数组。

namespace Downloader
open System
open System.IO
open System.Net
open System.Collections

type public BulkDownload(uriList : IEnumerable) =
    member this.UriList with get() = uriList

    member this.ParalellDownload() =                
        let Download (uri : Uri) = async {
            let processStreamAsync (stream : Stream) = async { 
                let outputStream = new MemoryStream()
                let buffer = Array.zeroCreate<byte> 0x1000
                let completed = ref false
                while not (!completed) do
                    let! bytesRead = stream.AsyncRead(buffer, 0, 0x1000)
                    if bytesRead = 0 then
                        completed := true
                    else
                        outputStream.Write(buffer, 0, bytesRead)
                stream.Close()
                return outputStream.ToArray() }

            let request = HttpWebRequest.Create(uri)
            let! response = request.AsyncGetResponse()
            use responseStream = response.GetResponseStream()
            let! contents = processStreamAsync responseStream
            return uri, contents.Length }

        this.UriList
        |> Seq.cast
        |> Seq.map Download
        |> Async.Parallel
        |> Async.RunSynchronously

    override this.ToString() = String.Join(", ", this.UriList)

I have been trying to learn F# for the past couple of day and I keep running into something that perplexes me. My "learning project" is a screen scraper for some data I'm kind of interested in manipulating.

In F# PowerPack there is a call Stream.AsyncReadToEnd. I did not want to use the PowerPack just for that single call so I took a look at how they did it.

module Downloader =
    open System
    open System.IO
    open System.Net
    open System.Collections

    type public BulkDownload(uriList : IEnumerable) =
        member this.UriList with get() = uriList

        member this.ParalellDownload() =
            let Download (uri : Uri) = async {
                let UnblockViaNewThread f = async {
                    do! Async.SwitchToNewThread()
                    let res = f()
                    do! Async.SwitchToThreadPool()
                    return res }

                let request = HttpWebRequest.Create(uri)
                let! response = request.AsyncGetResponse()
                use responseStream = response.GetResponseStream()
                use reader = new StreamReader(responseStream)
                let! contents = UnblockViaNewThread (fun() -> reader.ReadToEnd())
                return uri, contents.ToString().Length }

            this.UriList
            |> Seq.cast
            |> Seq.map Download
            |> Async.Parallel
            |> Async.RunSynchronously

They have that function UnblockViaNewThread. Is that really the only way to asynchronously read the response stream? Isn't creating a new thread really expensive (I've seen the "~1mb of memory" thrown around all over the place). Is there a better way to do this? Is this what's really happenening in every Async* call (one that I can let!)?

EDIT: I follow Tomas' suggestions and actually came up with something independent of F# PowerTools. Here it is. This really needs error handling, but it asynchronous requests and downloads a url to a byte array.

namespace Downloader
open System
open System.IO
open System.Net
open System.Collections

type public BulkDownload(uriList : IEnumerable) =
    member this.UriList with get() = uriList

    member this.ParalellDownload() =                
        let Download (uri : Uri) = async {
            let processStreamAsync (stream : Stream) = async { 
                let outputStream = new MemoryStream()
                let buffer = Array.zeroCreate<byte> 0x1000
                let completed = ref false
                while not (!completed) do
                    let! bytesRead = stream.AsyncRead(buffer, 0, 0x1000)
                    if bytesRead = 0 then
                        completed := true
                    else
                        outputStream.Write(buffer, 0, bytesRead)
                stream.Close()
                return outputStream.ToArray() }

            let request = HttpWebRequest.Create(uri)
            let! response = request.AsyncGetResponse()
            use responseStream = response.GetResponseStream()
            let! contents = processStreamAsync responseStream
            return uri, contents.Length }

        this.UriList
        |> Seq.cast
        |> Seq.map Download
        |> Async.Parallel
        |> Async.RunSynchronously

    override this.ToString() = String.Join(", ", this.UriList)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

柠檬色的秋千 2024-12-19 05:45:04

我认为仅在单独的线程上同步调用 ReadToEnd 的 AsyncReadToEnd 是错误的。

F# PowerPack 还包含一个 AsyncStreamReader 类型,其中包含流读取的正确异步实现。它有一个 ReadLine 方法,该方法(异步)返回下一行,并且仅从源流下载一些块(使用异步 ReadAsync ,而不是在后台线程上运行)。

let processStreamAsync stream = async { 
  use asyncReader = new AsyncStreamReader(stream)
  let completed = ref false
  while not (!completed) do 
    // Asynchrnously get the next line
    let! nextLine = asyncReader.ReadLine()
    if nextLine = null then completed := true
    else
       (* process the next line *)  }

如果您想将整个内容作为字符串下载(而不是逐行处理),那么您可以使用 AsyncStreamReaderReadToEnd 方法。这是一个正确的异步实现,它开始(异步)下载数据块并重复此操作而不会阻塞。

async { 
  use asyncReader = new AsyncStreamReader(stream)
  return! asyncReader.ReadToEnd() }

此外,F# PowerPack 是开源的并且具有宽松的许可证,因此使用它的最佳方法通常是将您需要的几个文件复制到项目中。

I think that AsyncReadToEnd that just synchronously calls ReadToEnd on a separate thread is wrong.

The F# PowerPack also contains a type AsyncStreamReader that contains proper asynchronous implementation of stream reading. It has a ReadLine method that (asynchronously) returns the next line and only downloads a few chunks from the source stream (using the asynchronous ReadAsync as opposed to running on a background thread).

let processStreamAsync stream = async { 
  use asyncReader = new AsyncStreamReader(stream)
  let completed = ref false
  while not (!completed) do 
    // Asynchrnously get the next line
    let! nextLine = asyncReader.ReadLine()
    if nextLine = null then completed := true
    else
       (* process the next line *)  }

If you want to download the whole content as a string (instead of processing it line-by-line), then you can use ReadToEnd method of AsyncStreamReader. This is a proper asynchronous implementation that starts downloading block of data (asynchronously) and repeats this without blocking.

async { 
  use asyncReader = new AsyncStreamReader(stream)
  return! asyncReader.ReadToEnd() }

Also, F# PowerPack is open-souorce and has permissive license, so the best way to use it is often to just copy the few files you need into your project.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文