调用异步在C#中的并行线程上函数
我有一个三个异步函数,我想同时并行从多个线程调用。到现在为止,我已经尝试了以下方法 -
int numOfThreads = 4;
var taskList = List<Task>();
using(fs = new FileStream(inputFilePath, FileMode.OpenOrCreate,FileAccess.ReadWrite,FileShare.ReadWrite))
{
for(int i=1; i<= numOfThreads ; i++)
{
taskList.Add(Task.Run( async() => {
byte[] buffer = new byte[length]; // length could be upto a few thousand
await Function1Async(); // Reads from the file into a byte array
long result = await Function2Aync(); // Does some async operation with that byte array data
await Function3Async(result); // Writes the result into the file
}
}
}
Task.WaitAll(taskList.toArray());
但是,在执行结束之前,并非所有任务都完成。我在C#中线程的经验有限。我在代码中做错了什么?还是我应该采取另一种方法?
编辑 - 所以我对自己的方法进行了一些更改。我现在摆脱了函数3ASYNC -
for(int i=1;i<=numOfThreads; i++)
{
using(fs = new FileStream(----))
{
taskList.Add(Task.Run( async() => {
byte[] buffer = new byte[length]; // length could be upto a few thousand
await Function1Async(buffer); // Reads from the file into a byte array
Stream data = new MemoryStream(buffer);
/** Write the Stream into a file and return
* the offset at which the write operation was done
*/
long blockStartOffset = await Function2Aync(data);
Console.WriteLine($"Block written at - {blockStartOffset}");
}
}
}
Task.WaitAll(taskList.toArray());
现在所有线程似乎都可以完成,但是功能2ASYNC似乎将一些日语字符随机写入输出文件。我想这可能是一些线程问题? 这是函数2ASYNC的实现 - &gt;
public async Task<long> Function2Async(Stream data)
{
long offset = getBlockOffset();
using(var outputFs = new FileStream(fileName,
FileMode.OpenOrCreate,
FileAccess.ReadWrite,
FileShare.ReadWrite))
{
outputFs.Seek(offset, SeekOrigin.Begin);
await data.CopyToAsync(outputFs);
}
return offset;
}
I have a three async function that I want to call from multiple threads in parallel at the same time. Till now I have tried the following approach -
int numOfThreads = 4;
var taskList = List<Task>();
using(fs = new FileStream(inputFilePath, FileMode.OpenOrCreate,FileAccess.ReadWrite,FileShare.ReadWrite))
{
for(int i=1; i<= numOfThreads ; i++)
{
taskList.Add(Task.Run( async() => {
byte[] buffer = new byte[length]; // length could be upto a few thousand
await Function1Async(); // Reads from the file into a byte array
long result = await Function2Aync(); // Does some async operation with that byte array data
await Function3Async(result); // Writes the result into the file
}
}
}
Task.WaitAll(taskList.toArray());
However, not all of the tasks complete before the execution reaches an end. I have limited experience with threading in c#. What am I doing wrong in my code? Or should I take an alternative approach?
EDIT -
So I made some changes to my approach. I got rid of the Function3Async for now -
for(int i=1;i<=numOfThreads; i++)
{
using(fs = new FileStream(----))
{
taskList.Add(Task.Run( async() => {
byte[] buffer = new byte[length]; // length could be upto a few thousand
await Function1Async(buffer); // Reads from the file into a byte array
Stream data = new MemoryStream(buffer);
/** Write the Stream into a file and return
* the offset at which the write operation was done
*/
long blockStartOffset = await Function2Aync(data);
Console.WriteLine(quot;Block written at - {blockStartOffset}");
}
}
}
Task.WaitAll(taskList.toArray());
Now all threads seem to proceed to completion but the Function2Async seems to randomly write some Japanese characters to the output file. I guess it is some threading issue perhaps?
Here is the implementation of the Function2Async ->
public async Task<long> Function2Async(Stream data)
{
long offset = getBlockOffset();
using(var outputFs = new FileStream(fileName,
FileMode.OpenOrCreate,
FileAccess.ReadWrite,
FileShare.ReadWrite))
{
outputFs.Seek(offset, SeekOrigin.Begin);
await data.CopyToAsync(outputFs);
}
return offset;
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在您的示例中,您既未将
fs
也不将buffer
纳入function> function1async
,但是您的评论说它从fs
读取为<代码>缓冲区,所以我假设这就是发生的事情。您无法并行从流中读取。它不支持这一点。如果您找到支持它的人,那将是可怕的效率,因为那是硬盘存储的工作方式。如果是网络驱动器,甚至更糟。
从流中读取到您的缓冲区 first ,然后按顺序,然后让您的线程松动并运行逻辑。并行,在内存中已经存在的缓冲区上。
顺便说一句,如果您写入同一文件,则会遇到相同的问题。如果您每个缓冲区写入一个文件,那很好,否则,请顺序进行。
In your example you have passed neither
fs
norbuffer
intoFunction1Async
but your comment says it reads fromfs
intobuffer
, so I will assume that is what happens.You cannot read from a stream in parallel. It does not support that. If you find one that supports it, it will be horribly inefficient, because that is how hard disk storage works. Even worse if it is a network drive.
Read from the stream into your buffers first and in sequence, then let your threads loose and run your logic. In parallel, on the already existing buffers in memory.
Writing by the way would have the same problem if you wrote to the same file. If you write to one file per buffer, that's fine, otherwise, do it sequentially.