调用异步在C#中的并行线程上函数

发布于 2025-02-06 23:26:51 字数 1998 浏览 1 评论 0原文

我有一个三个异步函数,我想同时并行从多个线程调用。到现在为止,我已经尝试了以下方法 -

int numOfThreads = 4; 
var taskList = List<Task>(); 
using(fs = new FileStream(inputFilePath, FileMode.OpenOrCreate,FileAccess.ReadWrite,FileShare.ReadWrite))
{
    for(int i=1; i<= numOfThreads ; i++) 
   {
      taskList.Add(Task.Run( async() => {
       byte[] buffer = new byte[length]; // length could be upto a few thousand
       await Function1Async(); // Reads from the file into a byte array
       long result = await Function2Aync(); // Does some async operation with that byte array data
       await Function3Async(result); // Writes the result into the file 
      }
   }
}
Task.WaitAll(taskList.toArray());  

但是,在执行结束之前,并非所有任务都完成。我在C#中线程的经验有限。我在代码中做错了什么?还是我应该采取另一种方法?

编辑 - 所以我对自己的方法进行了一些更改。我现在摆脱了函数3ASYNC -

for(int i=1;i<=numOfThreads; i++) 
{
   using(fs = new FileStream(----))
   {
      taskList.Add(Task.Run( async() => {
       byte[] buffer = new byte[length]; // length could be upto a few thousand
       await Function1Async(buffer); // Reads from the file into a byte array
       Stream data = new MemoryStream(buffer); 
       /** Write the Stream into a file and return 
        * the offset at which the write operation was done
        */
       long blockStartOffset = await Function2Aync(data); 

       Console.WriteLine($"Block written at - {blockStartOffset}");
      }
   }
}
Task.WaitAll(taskList.toArray());

现在所有线程似乎都可以完成,但是功能2ASYNC似乎将一些日语字符随机写入输出文件。我想这可能是一些线程问题? 这是函数2ASYNC的实现 - &gt;

public async Task<long> Function2Async(Stream data)
{
        long offset = getBlockOffset(); 
        using(var outputFs = new FileStream(fileName,
            FileMode.OpenOrCreate,
            FileAccess.ReadWrite,
            FileShare.ReadWrite))
        {
            outputFs.Seek(offset, SeekOrigin.Begin);
            await data.CopyToAsync(outputFs);     
        }
         return offset;
}

I have a three async function that I want to call from multiple threads in parallel at the same time. Till now I have tried the following approach -

int numOfThreads = 4; 
var taskList = List<Task>(); 
using(fs = new FileStream(inputFilePath, FileMode.OpenOrCreate,FileAccess.ReadWrite,FileShare.ReadWrite))
{
    for(int i=1; i<= numOfThreads ; i++) 
   {
      taskList.Add(Task.Run( async() => {
       byte[] buffer = new byte[length]; // length could be upto a few thousand
       await Function1Async(); // Reads from the file into a byte array
       long result = await Function2Aync(); // Does some async operation with that byte array data
       await Function3Async(result); // Writes the result into the file 
      }
   }
}
Task.WaitAll(taskList.toArray());  

However, not all of the tasks complete before the execution reaches an end. I have limited experience with threading in c#. What am I doing wrong in my code? Or should I take an alternative approach?

EDIT -
So I made some changes to my approach. I got rid of the Function3Async for now -

for(int i=1;i<=numOfThreads; i++) 
{
   using(fs = new FileStream(----))
   {
      taskList.Add(Task.Run( async() => {
       byte[] buffer = new byte[length]; // length could be upto a few thousand
       await Function1Async(buffer); // Reads from the file into a byte array
       Stream data = new MemoryStream(buffer); 
       /** Write the Stream into a file and return 
        * the offset at which the write operation was done
        */
       long blockStartOffset = await Function2Aync(data); 

       Console.WriteLine(
quot;Block written at - {blockStartOffset}");
      }
   }
}
Task.WaitAll(taskList.toArray());

Now all threads seem to proceed to completion but the Function2Async seems to randomly write some Japanese characters to the output file. I guess it is some threading issue perhaps?
Here is the implementation of the Function2Async ->

public async Task<long> Function2Async(Stream data)
{
        long offset = getBlockOffset(); 
        using(var outputFs = new FileStream(fileName,
            FileMode.OpenOrCreate,
            FileAccess.ReadWrite,
            FileShare.ReadWrite))
        {
            outputFs.Seek(offset, SeekOrigin.Begin);
            await data.CopyToAsync(outputFs);     
        }
         return offset;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

心作怪 2025-02-13 23:26:51

在您的示例中,您既未将fs也不将buffer纳入function> function1async,但是您的评论说它从fs读取为<代码>缓冲区,所以我假设这就是发生的事情。

您无法并行从流中读取。它不支持这一点。如果您找到支持它的人,那将是可怕的效率,因为那是硬盘存储的工作方式。如果是网络驱动器,甚至更糟。

从流中读取到您的缓冲区 first ,然后按顺序,然后让您的线程松动并运行逻辑。并行,在内存中已经存在的缓冲区上。

顺便说一句,如果您写入同一文件,则会遇到相同的问题。如果您每个缓冲区写入一个文件,那很好,否则,请顺序进行。

In your example you have passed neither fs nor buffer into Function1Async but your comment says it reads from fs into buffer, so I will assume that is what happens.

You cannot read from a stream in parallel. It does not support that. If you find one that supports it, it will be horribly inefficient, because that is how hard disk storage works. Even worse if it is a network drive.

Read from the stream into your buffers first and in sequence, then let your threads loose and run your logic. In parallel, on the already existing buffers in memory.

Writing by the way would have the same problem if you wrote to the same file. If you write to one file per buffer, that's fine, otherwise, do it sequentially.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文