使用平行时的连接问题。

发布于 2025-01-19 06:41:34 字数 2749 浏览 0 评论 0原文

我有一个 foreach 循环,负责执行一组特定的语句。其中一部分是将图像从 URL 保存到 Azure 存储。我必须对大量数据执行此操作。为了实现相同的目的,我将 foreach 循环转换为 Parallel.ForEach 循环。

 Parallel.ForEach(listSkills, item =>
 {
     // some business logic
     var b = getImageFromUrl(item.Url);
     Stream ms = new MemoryStream(b);

     saveImage(ms);
     // more business logic
 });

 private static byte[] getByteArray(Stream input)
 {
   using (MemoryStream ms = new MemoryStream())
   {
     input.CopyTo(ms);
     return ms.ToArray();
   }
 }

 public static byte[] getImageFromUrl(string url)
 {
    HttpWebRequest request = null;
    HttpWebResponse response = null;
    byte[] b = null;
    request = (HttpWebRequest)WebRequest.Create(url);
    response = (HttpWebResponse)request.GetResponse();
    if (request.HaveResponse)
    {
      if (response.StatusCode == HttpStatusCode.OK)
       {
          Stream receiveStream = response.GetResponseStream();
          b = getByteArray(receiveStream);
       }
    }

    return b;
 }

 public static void saveImage(Stream fileContent)
 {
  fileContent.Seek(0, SeekOrigin.Begin);
  byte[] bytes = getByteArray(fileContent);
  var blob = null;
  blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length).Wait();
 }

尽管有时我会收到以下错误并且图像未保存。

现有连接被远程主机强制关闭。

还共享 StackTrace :

   at System.Net.Sockets.NetworkStream.Read(Span`1 buffer)
   at System.Net.Security.SslStream.<FillBufferAsync>d__183`1.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Net.Security.SslStream.<ReadAsyncInternal>d__181`1.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Net.Security.SslStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.Stream.Read(Span`1 buffer)
   at System.Net.Http.HttpConnection.Read(Span`1 destination)
   at System.Net.Http.HttpConnection.ContentLengthReadStream.Read(Span`1 buffer)
   at System.Net.Http.HttpBaseStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.Stream.CopyTo(Stream destination, Int32 bufferSize)
   at Utilities.getByteArray(Stream input) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 238
   at Utilities.getImageFromUrl(String url) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 178

我猜这可能是因为我没有使用锁?我不确定是否在 Parallel.ForEach 循环中使用锁。

I have a foreach loop which is responsible for executing a certain set of statements. A part of that is to save an image from a URL to Azure storage. I have to do this for a large set of data. To achieve the same I have converted the foreach loop into a Parallel.ForEach loop.

 Parallel.ForEach(listSkills, item =>
 {
     // some business logic
     var b = getImageFromUrl(item.Url);
     Stream ms = new MemoryStream(b);

     saveImage(ms);
     // more business logic
 });

 private static byte[] getByteArray(Stream input)
 {
   using (MemoryStream ms = new MemoryStream())
   {
     input.CopyTo(ms);
     return ms.ToArray();
   }
 }

 public static byte[] getImageFromUrl(string url)
 {
    HttpWebRequest request = null;
    HttpWebResponse response = null;
    byte[] b = null;
    request = (HttpWebRequest)WebRequest.Create(url);
    response = (HttpWebResponse)request.GetResponse();
    if (request.HaveResponse)
    {
      if (response.StatusCode == HttpStatusCode.OK)
       {
          Stream receiveStream = response.GetResponseStream();
          b = getByteArray(receiveStream);
       }
    }

    return b;
 }

 public static void saveImage(Stream fileContent)
 {
  fileContent.Seek(0, SeekOrigin.Begin);
  byte[] bytes = getByteArray(fileContent);
  var blob = null;
  blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length).Wait();
 }

Although there are instances when I am getting the below error and the image is not getting saved.

An existing connection was forcibly closed by the remote host.

Also sharing the StackTrace :

   at System.Net.Sockets.NetworkStream.Read(Span`1 buffer)
   at System.Net.Security.SslStream.<FillBufferAsync>d__183`1.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Net.Security.SslStream.<ReadAsyncInternal>d__181`1.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Net.Security.SslStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.Stream.Read(Span`1 buffer)
   at System.Net.Http.HttpConnection.Read(Span`1 destination)
   at System.Net.Http.HttpConnection.ContentLengthReadStream.Read(Span`1 buffer)
   at System.Net.Http.HttpBaseStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.Stream.CopyTo(Stream destination, Int32 bufferSize)
   at Utilities.getByteArray(Stream input) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 238
   at Utilities.getImageFromUrl(String url) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 178

I am guessing this maybe because I am not using locks? I am unsure whether to use locks within a Parallel.ForEach loop.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

烟花易冷人易散 2025-01-26 06:41:34

根据stackoverflow上的另一个问题,以下是现有连接被远程主机强制关闭的潜在原因.

  • 您正在向应用程序发送格式错误的数据(其中可能包括向 HTTP 服务器发送 HTTPS 请求)
  • 客户端和服务器之间的网络链接由于某种原因中断
  • 您在第三方应用程序中触发了一个错误,导致其崩溃
  • 第三方应用程序已耗尽系统资源

由于只有您的部分请求受到影响,我认为我们可以排除第一个。当然,这可能是网络问题,在这种情况下,这种情况会不时发生,具体取决于您和服务器之间的网络质量。

除非您从其他用户那里发现 AzureStorage 错误的迹象,否则您的调用很可能同时消耗过多的远程服务器资源(连接/数据)。服务器和代理对它们可以同时处理的连接数量(尤其是来自同一客户端计算机)有限制。

根据 listSkills 列表的大小,您的代码可能会并行发起大量请求(与线程池的数量一样多),这可能会淹没服务器。

您至少可以使用 MaxDegreeOfParallelism 来限制并行任务启动的数量,如下所示:

Parallel.ForEach(listSkills,
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    item =>
    {
         // some business logic
         var b = getImageFromUrl(item.Url);
         Stream ms = new MemoryStream(b);
    
         saveImage(ms);
         // more business logic
    });

According to another question on stackoverflow, here are the potential causes for An existing connection was forcibly closed by the remote host. :

  • You are sending malformed data to the application (which could include sending an HTTPS request to an HTTP server)
  • The network link between the client and server is going down for some reason
  • You have triggered a bug in the third-party application that caused it to crash
  • The third-party application has exhausted system resources

Since only some of your requests are affected, I think we can exclude the first one. This can be, of course, a network issue, and in that case, this would happend from time to time depending on the quality of the netwok between you and the server.

Unless you find indication of an AzureStorage's bug from other users, there is a high probability your call are consuming too much of the remote server's resources (connections/data) at the same time. Servers and proxy have limitation on how much connections they can handle at the same time (especially from the same client machine).

Depending on the size of your listSkills list, your code may launch a big number of request in parallel (as much as your thread pool can), possibly flooding the server.

You could at least limit the number of parallel task launch using MaxDegreeOfParallelism like this :

Parallel.ForEach(listSkills,
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    item =>
    {
         // some business logic
         var b = getImageFromUrl(item.Url);
         Stream ms = new MemoryStream(b);
    
         saveImage(ms);
         // more business logic
    });
回忆凄美了谁 2025-01-26 06:41:34

您可以控制类似的并行性:

listSkills.AsParallel()
                .Select(item => {/*Your Logic*/ return item})
                .WithDegreeOfParallelism(10)
                .Select(item =>
                  {
                      getImageFromUrl(item.url);
                      saveImage(your_stream);
                      return item;
                  });

但是Parallel.Foreach对于io不好io-bound操作专门制作Web请求,您可能会在等待响应时浪费线程池线程。

您使用异步Web请求方法,例如httpwebrequest.getResponsync,在另一侧,您也可以使用线程同步构造,作为AB示例,使用Smemaphore 就像队列一样,它允许x线程通过,其余的应该等到一个繁忙的线程将完成其工作。
首先将您的getTream方法作为async喜欢(这不是很好的解决方案,但可以更好):

public static async Task getImageFromUrl(SemaphoreSlim semaphore, string url)
{
    try
    {
        HttpWebRequest request = null; 
        byte[] b = null;
        request = (HttpWebRequest)WebRequest.Create(url);
        using (var response = await request.GetResponseAsync().ConfigureAwait(false))
        {
            // your logic
        } 
    }
    catch (Exception ex)
    {
        // handle exp
    }
    finally
    {
        // release
        semaphore.Release();
    }
}

然后:

using (var semaphore = new SemaphoreSlim(10))
{
    foreach (var url in urls)
    {
        // await here until there is a room for this task
        await semaphore.WaitAsync();
        tasks.Add(getImageFromUrl(semaphore, url));
    }
    // await for the rest of tasks to complete
    await Task.WhenAll(tasks);
}

您不应使用Paralleltask.run相反,您可以拥有async处理程序方法类似:

public async Task handleResponse(Task<HttpResponseMessage> response)
{
    HttpResponseMessage response = await response;
    //Process your data
}

然后使用task.whenall类似:

Task[] requests = myList.Select(l => getImageFromUrl(l.Id))
                        .Select(r => handleResponse(r))
                        .ToArray(); 
await Task.WhenAll(requests);

最后有几个解决方案对于您的方案,但请忘记Parallel.Foreach而是使用优化的解决方案。

You can control parallelism like:

listSkills.AsParallel()
                .Select(item => {/*Your Logic*/ return item})
                .WithDegreeOfParallelism(10)
                .Select(item =>
                  {
                      getImageFromUrl(item.url);
                      saveImage(your_stream);
                      return item;
                  });

But Parallel.ForEach is not good for IO because it's designed for CPU-intensive tasks, if you use it for IO-bound operations specially making web requests you may waste thread pool thread blocked while waiting for response.

You use asynchronous web request methods like HttpWebRequest.GetResponseAsync, in the other side you can also use thread synchronization constructs for that, as ab example using Semaphore, the Semaphore is like queue, it allows X threads to pass, and the rest should wait until one of busy threads will finish it's work.
First make your getStream method as async like (this is not good solution but can be better):

public static async Task getImageFromUrl(SemaphoreSlim semaphore, string url)
{
    try
    {
        HttpWebRequest request = null; 
        byte[] b = null;
        request = (HttpWebRequest)WebRequest.Create(url);
        using (var response = await request.GetResponseAsync().ConfigureAwait(false))
        {
            // your logic
        } 
    }
    catch (Exception ex)
    {
        // handle exp
    }
    finally
    {
        // release
        semaphore.Release();
    }
}

and then:

using (var semaphore = new SemaphoreSlim(10))
{
    foreach (var url in urls)
    {
        // await here until there is a room for this task
        await semaphore.WaitAsync();
        tasks.Add(getImageFromUrl(semaphore, url));
    }
    // await for the rest of tasks to complete
    await Task.WhenAll(tasks);
}

You should not use the Parallel or Task.Run instead you can have an async handler method like:

public async Task handleResponse(Task<HttpResponseMessage> response)
{
    HttpResponseMessage response = await response;
    //Process your data
}

and then use Task.WhenAll like:

Task[] requests = myList.Select(l => getImageFromUrl(l.Id))
                        .Select(r => handleResponse(r))
                        .ToArray(); 
await Task.WhenAll(requests);

at the end there are several solution for your scenario but forget Parallel.Foreach instead use optimized solution.

攒眉千度 2025-01-26 06:41:34

此代码有几个问题:

  • Parallel.Foreach是用于数据并行性的,而不是IO。该代码正在冻结所有CPU内核,等待IO完成
  • HTTPWebRequest是.NET Core中HTTPCLIENT的包装器。使用httpwebrequest效率低下,比所需的要复杂得多。
  • httpclient可以发布检索或发布流内容,这意味着没有理由将流内容加载到内存中。 httpclient是线程安全的,也是要重复使用的。

有几种方法可以在.NET Core中同时执行许多IO操作。

.net 6

在当前长期支持版本的.net,.net 6,可以使用 parallel.foreachasync 。 Scott Hanselman 显示了它在API呼叫中使用它的容易

可以用getByTesAsync直接检索数据:

record CopyRequest(Uri sourceUri,Uri blobUri);
...
var requests=new List<CopyRequest>();
//Load some source/target URLs

var client=new HttpClient();
await Parallel.ForEachAsync(requests,async req=>{
    var bytes=await client.GetBytesAsync(req.sourceUri);
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length);
});

一个更好的选择是将数据作为流检索并将其直接发送到斑点:

await Parallel.ForEachAsync(requests,async req=>{
    var response=await client.GetAsync(req.sourceUri, 
                       HttpCompletionOption.ResponseHeadersRead);
    using var sourceStream=await response.Content.ReadAsStreamAsync();
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromStreamAsync(sourceStream);
});

httpcompletionoption.responseheadersread原因>原因getAsync getAsync在收到响应标头后立即返回,而无需缓冲任何响应数据。

.NET 3.1

在较旧的.NET核心版本中(几个月内已达到寿命),您可以使用大于1的ActionBlock,大于1:

var options=new ExecuteDataflowBlockOptions{ MaxDegreeOfParallelism = 8};

var copyBlock=new ActionBlock<CopyRequest>(async req=>{
    var response=await client.GetAsync(req.sourceUri, 
                       HttpCompletionOption.ResponseHeadersRead);
    using var sourceStream=await response.Content.ReadAsStreamAsync();
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromStreamAsync(sourceStream);
}, options);

块类别在TPL DataFlow库中,可用于构建类似于Shell脚本管道的处理管道,每个块将其输出输出到下一个块。

There are several problems with this code:

  • Parallel.ForEach is meant for data parallelism, not IO. The code is freezing all CPU cores waiting for IO to complete
  • HttpWebRequest is a wrapper over HttpClient in .NET Core. Using HttpWebRequest is inefficient and far more complex than needed.
  • HttpClient can post retrieve or post stream contents which means there's no reason to load stream contents in memory. HttpClient is thread-safe and meant to be reused too.

There are several ways to execute many IO operations concurrently in .NET Core.

.NET 6

In the current Long-Term-Support version of .NET, .NET 6, this can be done using Parallel.ForEachAsync. Scott Hanselman shows how easy it is to use it for API calls

You can retrieve the data directly with GetBytesAsync :

record CopyRequest(Uri sourceUri,Uri blobUri);
...
var requests=new List<CopyRequest>();
//Load some source/target URLs

var client=new HttpClient();
await Parallel.ForEachAsync(requests,async req=>{
    var bytes=await client.GetBytesAsync(req.sourceUri);
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length);
});

A better option would be to retrieve the data as a stream and send it directly to the blob :

await Parallel.ForEachAsync(requests,async req=>{
    var response=await client.GetAsync(req.sourceUri, 
                       HttpCompletionOption.ResponseHeadersRead);
    using var sourceStream=await response.Content.ReadAsStreamAsync();
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromStreamAsync(sourceStream);
});

HttpCompletionOption.ResponseHeadersRead causes GetAsync to return as soon as the response headers are received, without buffering any of the response data.

.NET 3.1

In older .NET Core versions (which are reaching End-of-Life in a few months) you can use eg an ActionBlock with a Degree-Of-Parallelism greater than 1:

var options=new ExecuteDataflowBlockOptions{ MaxDegreeOfParallelism = 8};

var copyBlock=new ActionBlock<CopyRequest>(async req=>{
    var response=await client.GetAsync(req.sourceUri, 
                       HttpCompletionOption.ResponseHeadersRead);
    using var sourceStream=await response.Content.ReadAsStreamAsync();
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromStreamAsync(sourceStream);
}, options);

The block classes in the TPL Dataflow library can be used to construct processing pipelines similar to a shell script pipeline, with each block piping its output to the next block.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文