c#加快从响应流到viewStream的写作

发布于 2025-02-04 03:18:14 字数 3960 浏览 2 评论 0原文

我有此代码异步将文件分为部分,并使用HTTP内容范围下载它们。然后,它将下载的数据写入内存映射文件上的ViewStream。我目前正在从响应流中读取缓冲区,然后将所有数据从缓冲区写入ViewStream。是否有更高效/更快的方法?我并不是真的关心内存使用,但我正在尝试最大化速度。零件是一个列表,其中包含一个值的元组,指示文件的片段(启动,结尾),而HTTPPOOL是一个具有一堆预配置HTTP客户端的对象池。非常感谢任何帮助,谢谢!

await Parallel.ForEachAsync(pieces,
                        new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
                        async (piece, cancellationToken) =>
                        {
                            //Get a http client from the pool and request for the content range
                            var client = httpPool.Get();
                            var request = new HttpRequestMessage { RequestUri = new Uri(url) };
                            request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);
                            
                            //Request headers so we dont cache the file into memory
                            if (client != null)
                            {
                                var message = await client.SendAsync(request,HttpCompletionOption.ResponseHeadersRead,cancellationToken).ConfigureAwait(false);

                                if (message.IsSuccessStatusCode)
                                {
                                    //Get the content stream from the message request
                                    using (var streamToRead = await message.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
                                    {
                                        //Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
                                        using (var streams = mmf.CreateViewStream(piece.Item1,message.Content.Headers.ContentLength!.Value,MemoryMappedFileAccess.Write))
                                        {
                                            //Copy from the content stream to the mmf stream
                                            var buffer = new byte[bufferSize];
                                            int offset, bytesRead;
                                            // Until we've read everything
                                            do
                                            {
                                                offset = 0;
                                                // Until the buffer is very nearly full or there's nothing left to read
                                                do
                                                {
                                                    bytesRead = await streamToRead.ReadAsync(buffer.AsMemory(offset, bufferSize - offset),cancellationToken);
                                                    offset += bytesRead;
                                                } while (bytesRead != 0 && offset < bufferSize);

                                                // Empty the buffer
                                                if (offset != 0)
                                                {
                                                    await streams.WriteAsync(buffer.AsMemory(0, offset),cancellationToken);
                                                }
                                            } while (bytesRead != 0);

                                            streams.Flush();
                                            streams.Close();
                                        }

                                        streamToRead.Close();
                                    }
                                }

                                message.Content.Dispose();
                                message.Dispose();
                            }

                            request.Dispose();
                            httpPool.Return(client);
                        });

I have this code that is asynchronously splitting a file into parts, and downloading them using HTTP content range. It then writes the downloaded data to a ViewStream on a Memory Mapped file. I am currently reading from the response stream into a buffer, then writing all the data from the buffer into the ViewStream. Is there a more efficient/faster way to do this? I am not really concerned about memory use, but I am trying to maximize speed. Pieces is a list that contains value tuples indicating the (Start, End) for the piece of the file, and httpPool is a object pool with a bunch of preconfigured HTTP Clients. Any help is greatly appreciated, thank you!

await Parallel.ForEachAsync(pieces,
                        new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
                        async (piece, cancellationToken) =>
                        {
                            //Get a http client from the pool and request for the content range
                            var client = httpPool.Get();
                            var request = new HttpRequestMessage { RequestUri = new Uri(url) };
                            request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);
                            
                            //Request headers so we dont cache the file into memory
                            if (client != null)
                            {
                                var message = await client.SendAsync(request,HttpCompletionOption.ResponseHeadersRead,cancellationToken).ConfigureAwait(false);

                                if (message.IsSuccessStatusCode)
                                {
                                    //Get the content stream from the message request
                                    using (var streamToRead = await message.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
                                    {
                                        //Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
                                        using (var streams = mmf.CreateViewStream(piece.Item1,message.Content.Headers.ContentLength!.Value,MemoryMappedFileAccess.Write))
                                        {
                                            //Copy from the content stream to the mmf stream
                                            var buffer = new byte[bufferSize];
                                            int offset, bytesRead;
                                            // Until we've read everything
                                            do
                                            {
                                                offset = 0;
                                                // Until the buffer is very nearly full or there's nothing left to read
                                                do
                                                {
                                                    bytesRead = await streamToRead.ReadAsync(buffer.AsMemory(offset, bufferSize - offset),cancellationToken);
                                                    offset += bytesRead;
                                                } while (bytesRead != 0 && offset < bufferSize);

                                                // Empty the buffer
                                                if (offset != 0)
                                                {
                                                    await streams.WriteAsync(buffer.AsMemory(0, offset),cancellationToken);
                                                }
                                            } while (bytesRead != 0);

                                            streams.Flush();
                                            streams.Close();
                                        }

                                        streamToRead.Close();
                                    }
                                }

                                message.Content.Dispose();
                                message.Dispose();
                            }

                            request.Dispose();
                            httpPool.Return(client);
                        });

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

独﹏钓一江月 2025-02-11 03:18:14

我不知道它会有多大帮助,但我试图做点什么。它的工作状况如何?

我也做了一些重构,所以这里有一些注释:

  • 请勿调用.close().dispose()如果您已经使用使用语句的块或。它所做的就是向您的代码添加噪音,并使任何阅读它的人感到困惑。实际上,几乎从不调用.close().dispose()完全手动。
  • 如果方法中发生任何例外,您是否意识到客户端永远不会返回池?您需要在最后做这些事情,最后块或使用iDisposable struct,该结构将client> client返回到dispose中的池中( )实现。 (另外,请求如果发生任何例外,则不会在方法中放置,请使用添加)
  • ,更喜欢是否提早返回的语句而不是包裹整个内容的语句其余方法。后者很难阅读和维护。
  • 您并没有真正从平行中受益,因为该方法的99%是异步等待IO的。只需使用task.whenall()
  • 我摆脱了自定义缓冲/复制,刚刚称为copytoasync()方法message.content,该方法接受stream。它应该有助于表现。我认为它必须比最简单的缓冲区Thingee更好地优化。

代码:

await Task.WhenAll(pieces.Select(p => DownloadToMemoryMappedFile(p)));

// change the piece type from dynamic to what you need
async Task DownloadToMemoryMappedFile(dynamic piece, CancellationToken cancellationToken = default)
{
    //Get a http client from the pool and request for the content range
    var client = httpPool.Get();

    try
    {
        using var request = new HttpRequestMessage { RequestUri = new Uri(url) };

        //Request headers so we dont cache the file into memory
        request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);

        if (client is null)
        return;

        using var message = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

        if (!message.IsSuccessStatusCode)
            return;

        //Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
        using var streams = mmf.CreateViewStream(piece.Item1, message.Content.Headers.ContentLength!.Value, MemoryMappedFileAccess.Write);

        await message.Content.CopyToAsync(streams).ConfigureAwait(false);
    }
    finally
    {
        httpPool.Return(client);
    }
}

I don't know how much it is going to help, but I tried to do something. How well does it work?

I also did some refactoring, so here are some notes:

  • Do not call .Close() or .Dispose() manually if you already have a using block or a using statement. All it does is add noise to your code and confuse anyone reading it. In fact, almost never call .Close() or .Dispose() manually at all.
  • Do you realize client would never be returned to the pool if any exception occurred in the method? You need to do these things in a finally block or by using an IDisposable struct which returns client to the pool in it's Dispose() implementation. (also, request would not be disposed in the method if any exception occurred, add using)
  • Whenever you can, prefer if statements that return early rather than ones that wrap the entire rest of the method. The latter is hard to read and maintain.
  • You are not really benefiting from Parallel as 99% of the method is asynchronously waiting for IO. Just use Task.WhenAll() instead.
  • I got rid of the custom buffering/copying and just called the CopyToAsync() method on message.Content which accepts a Stream. It should help the performance, probably. I reckon it has to be better optimized than the simplest possible buffer thingee.

Code:

await Task.WhenAll(pieces.Select(p => DownloadToMemoryMappedFile(p)));

// change the piece type from dynamic to what you need
async Task DownloadToMemoryMappedFile(dynamic piece, CancellationToken cancellationToken = default)
{
    //Get a http client from the pool and request for the content range
    var client = httpPool.Get();

    try
    {
        using var request = new HttpRequestMessage { RequestUri = new Uri(url) };

        //Request headers so we dont cache the file into memory
        request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);

        if (client is null)
        return;

        using var message = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

        if (!message.IsSuccessStatusCode)
            return;

        //Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
        using var streams = mmf.CreateViewStream(piece.Item1, message.Content.Headers.ContentLength!.Value, MemoryMappedFileAccess.Write);

        await message.Content.CopyToAsync(streams).ConfigureAwait(false);
    }
    finally
    {
        httpPool.Return(client);
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文