使用平行时的连接问题。
我有一个 foreach 循环,负责执行一组特定的语句。其中一部分是将图像从 URL 保存到 Azure 存储。我必须对大量数据执行此操作。为了实现相同的目的,我将 foreach
循环转换为 Parallel.ForEach
循环。
Parallel.ForEach(listSkills, item =>
{
// some business logic
var b = getImageFromUrl(item.Url);
Stream ms = new MemoryStream(b);
saveImage(ms);
// more business logic
});
private static byte[] getByteArray(Stream input)
{
using (MemoryStream ms = new MemoryStream())
{
input.CopyTo(ms);
return ms.ToArray();
}
}
public static byte[] getImageFromUrl(string url)
{
HttpWebRequest request = null;
HttpWebResponse response = null;
byte[] b = null;
request = (HttpWebRequest)WebRequest.Create(url);
response = (HttpWebResponse)request.GetResponse();
if (request.HaveResponse)
{
if (response.StatusCode == HttpStatusCode.OK)
{
Stream receiveStream = response.GetResponseStream();
b = getByteArray(receiveStream);
}
}
return b;
}
public static void saveImage(Stream fileContent)
{
fileContent.Seek(0, SeekOrigin.Begin);
byte[] bytes = getByteArray(fileContent);
var blob = null;
blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length).Wait();
}
尽管有时我会收到以下错误并且图像未保存。
现有连接被远程主机强制关闭。
还共享 StackTrace :
at System.Net.Sockets.NetworkStream.Read(Span`1 buffer)
at System.Net.Security.SslStream.<FillBufferAsync>d__183`1.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Net.Security.SslStream.<ReadAsyncInternal>d__181`1.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Net.Security.SslStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.Stream.Read(Span`1 buffer)
at System.Net.Http.HttpConnection.Read(Span`1 destination)
at System.Net.Http.HttpConnection.ContentLengthReadStream.Read(Span`1 buffer)
at System.Net.Http.HttpBaseStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.Stream.CopyTo(Stream destination, Int32 bufferSize)
at Utilities.getByteArray(Stream input) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 238
at Utilities.getImageFromUrl(String url) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 178
我猜这可能是因为我没有使用锁?我不确定是否在 Parallel.ForEach 循环中使用锁。
I have a foreach
loop which is responsible for executing a certain set of statements. A part of that is to save an image from a URL to Azure storage. I have to do this for a large set of data. To achieve the same I have converted the foreach
loop into a Parallel.ForEach
loop.
Parallel.ForEach(listSkills, item =>
{
// some business logic
var b = getImageFromUrl(item.Url);
Stream ms = new MemoryStream(b);
saveImage(ms);
// more business logic
});
private static byte[] getByteArray(Stream input)
{
using (MemoryStream ms = new MemoryStream())
{
input.CopyTo(ms);
return ms.ToArray();
}
}
public static byte[] getImageFromUrl(string url)
{
HttpWebRequest request = null;
HttpWebResponse response = null;
byte[] b = null;
request = (HttpWebRequest)WebRequest.Create(url);
response = (HttpWebResponse)request.GetResponse();
if (request.HaveResponse)
{
if (response.StatusCode == HttpStatusCode.OK)
{
Stream receiveStream = response.GetResponseStream();
b = getByteArray(receiveStream);
}
}
return b;
}
public static void saveImage(Stream fileContent)
{
fileContent.Seek(0, SeekOrigin.Begin);
byte[] bytes = getByteArray(fileContent);
var blob = null;
blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length).Wait();
}
Although there are instances when I am getting the below error and the image is not getting saved.
An existing connection was forcibly closed by the remote host.
Also sharing the StackTrace :
at System.Net.Sockets.NetworkStream.Read(Span`1 buffer)
at System.Net.Security.SslStream.<FillBufferAsync>d__183`1.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Net.Security.SslStream.<ReadAsyncInternal>d__181`1.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Net.Security.SslStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.Stream.Read(Span`1 buffer)
at System.Net.Http.HttpConnection.Read(Span`1 destination)
at System.Net.Http.HttpConnection.ContentLengthReadStream.Read(Span`1 buffer)
at System.Net.Http.HttpBaseStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.Stream.CopyTo(Stream destination, Int32 bufferSize)
at Utilities.getByteArray(Stream input) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 238
at Utilities.getImageFromUrl(String url) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 178
I am guessing this maybe because I am not using locks? I am unsure whether to use locks within a Parallel.ForEach
loop.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
根据stackoverflow上的另一个问题,以下是现有连接被远程主机强制关闭的潜在原因.:
由于只有您的部分请求受到影响,我认为我们可以排除第一个。当然,这可能是网络问题,在这种情况下,这种情况会不时发生,具体取决于您和服务器之间的网络质量。
除非您从其他用户那里发现 AzureStorage 错误的迹象,否则您的调用很可能同时消耗过多的远程服务器资源(连接/数据)。服务器和代理对它们可以同时处理的连接数量(尤其是来自同一客户端计算机)有限制。
根据
listSkills
列表的大小,您的代码可能会并行发起大量请求(与线程池的数量一样多),这可能会淹没服务器。您至少可以使用 MaxDegreeOfParallelism 来限制并行任务启动的数量,如下所示:
According to another question on stackoverflow, here are the potential causes for An existing connection was forcibly closed by the remote host. :
Since only some of your requests are affected, I think we can exclude the first one. This can be, of course, a network issue, and in that case, this would happend from time to time depending on the quality of the netwok between you and the server.
Unless you find indication of an AzureStorage's bug from other users, there is a high probability your call are consuming too much of the remote server's resources (connections/data) at the same time. Servers and proxy have limitation on how much connections they can handle at the same time (especially from the same client machine).
Depending on the size of your
listSkills
list, your code may launch a big number of request in parallel (as much as your thread pool can), possibly flooding the server.You could at least limit the number of parallel task launch using
MaxDegreeOfParallelism
like this :您可以控制类似的并行性:
但是
Parallel.Foreach
对于io
不好io-bound
操作专门制作Web请求,您可能会在等待响应时浪费线程池线程。您使用异步Web请求方法,例如
httpwebrequest.getResponsync
,在另一侧,您也可以使用线程同步构造,作为AB示例,使用Smemaphore
就像队列一样,它允许x
线程通过,其余的应该等到一个繁忙的线程将完成其工作。首先将您的
getTream
方法作为async
喜欢(这不是很好的解决方案,但可以更好):然后:
您不应使用
Parallel
或task.run
相反,您可以拥有async
处理程序方法类似:然后使用
task.whenall
类似:最后有几个解决方案对于您的方案,但请忘记
Parallel.Foreach
而是使用优化的解决方案。You can control parallelism like:
But
Parallel.ForEach
is not good forIO
because it's designed forCPU-intensive
tasks, if you use it forIO-bound
operations specially making web requests you may waste thread pool thread blocked while waiting for response.You use asynchronous web request methods like
HttpWebRequest.GetResponseAsync
, in the other side you can also use thread synchronization constructs for that, as ab example usingSemaphore
, theSemaphore
is like queue, it allowsX
threads to pass, and the rest should wait until one of busy threads will finish it's work.First make your
getStream
method asasync
like (this is not good solution but can be better):and then:
You should not use the
Parallel
orTask.Run
instead you can have anasync
handler method like:and then use
Task.WhenAll
like:at the end there are several solution for your scenario but forget
Parallel.Foreach
instead use optimized solution.此代码有几个问题:
Parallel.Foreach
是用于数据并行性的,而不是IO。该代码正在冻结所有CPU内核,等待IO完成有几种方法可以在.NET Core中同时执行许多IO操作。
.net 6
在当前长期支持版本的.net,.net 6,可以使用 parallel.foreachasync 。 Scott Hanselman 显示了它在API呼叫中使用它的容易
可以用
getByTesAsync
直接检索数据:一个更好的选择是将数据作为流检索并将其直接发送到斑点:
httpcompletionoption.responseheadersread
原因>原因getAsync getAsync
在收到响应标头后立即返回,而无需缓冲任何响应数据。.NET 3.1
在较旧的.NET核心版本中(几个月内已达到寿命),您可以使用大于1的ActionBlock,大于1:
块类别在TPL DataFlow库中,可用于构建类似于Shell脚本管道的处理管道,每个块将其输出输出到下一个块。
There are several problems with this code:
Parallel.ForEach
is meant for data parallelism, not IO. The code is freezing all CPU cores waiting for IO to completeThere are several ways to execute many IO operations concurrently in .NET Core.
.NET 6
In the current Long-Term-Support version of .NET, .NET 6, this can be done using Parallel.ForEachAsync. Scott Hanselman shows how easy it is to use it for API calls
You can retrieve the data directly with
GetBytesAsync
:A better option would be to retrieve the data as a stream and send it directly to the blob :
HttpCompletionOption.ResponseHeadersRead
causesGetAsync
to return as soon as the response headers are received, without buffering any of the response data..NET 3.1
In older .NET Core versions (which are reaching End-of-Life in a few months) you can use eg an ActionBlock with a Degree-Of-Parallelism greater than 1:
The block classes in the TPL Dataflow library can be used to construct processing pipelines similar to a shell script pipeline, with each block piping its output to the next block.