.NET任务返回对象并在内部调用异步

发布于 2025-02-03 06:35:08 字数 1269 浏览 2 评论 0原文

当all()>时,我有一些在中执行的任务。如果任务返回对象并在其run()中调用异步方法,我会遇到语义错误。异步方法从blob获取一些字符串内容,然后构造并返回对象。

您知道如何解决此问题,同时维护任务完成的批次下载? 我需要一个带有finalWrapperObjects的列表。

错误信息

不能将异步lamba表达式转换为委托类型 'func< finalWrapperObject>'。异步的lambda表达可能会返回 无效,任务或任务,它们都不可转换为 'func< finalWrapperObject>'。

...
List<FinalWrapperObject> finalReturns = new List<FinalWrapperObject>();
List<Task<FinalWrapperObject>> tasks = new List<Task<FinalWrapperObject>>();
var resultsBatch = fetchedObjects.Skip(i).Take(10).ToList();

foreach (var resultBatchItem in resultsBatch)
{
    tasks.Add(
        new Task<FinalWrapperObject>(async () => //!! errors here on arrow
        {
            var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
            return new FinalWrapperObject {
                BlobContent = blobContent,
                CreationDateTime = resultBatchItem.CreationDateTime
            };
        })
    );
}

FinalWrapperObject[] listFinalWrapperObjects = await Task.WhenAll(tasks);
finalReturns.AddRange(listFinalWrapperObjects);

return finalReturns;

I have some tasks executing in a WhenAll(). I get a semantic error if a task returns an object and calls an async method inside their Run(). The async method fetches from Blob some string content, then constructs and returns an object.

Do you know how to solve this issue, while maintaining the batch download done by tasks?
I need a list with those FinalWrapperObjects.

Error message

Cannot convert async lamba expression to delegate type
'Func<FinalWrapperObject>'. An async lambda expression may return
void, Task or Task, none of which are convertible to
'Func<FinalWrapperObject>'.

...
List<FinalWrapperObject> finalReturns = new List<FinalWrapperObject>();
List<Task<FinalWrapperObject>> tasks = new List<Task<FinalWrapperObject>>();
var resultsBatch = fetchedObjects.Skip(i).Take(10).ToList();

foreach (var resultBatchItem in resultsBatch)
{
    tasks.Add(
        new Task<FinalWrapperObject>(async () => //!! errors here on arrow
        {
            var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
            return new FinalWrapperObject {
                BlobContent = blobContent,
                CreationDateTime = resultBatchItem.CreationDateTime
            };
        })
    );
}

FinalWrapperObject[] listFinalWrapperObjects = await Task.WhenAll(tasks);
finalReturns.AddRange(listFinalWrapperObjects);

return finalReturns;

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

明媚如初 2025-02-10 06:35:08

您的代码永远不会启动任何任务。无论如何,任务不是线程。他们是承诺,即某些东西将完成并可能在未来 Future 中产生价值。有些任务需要线程运行。这些是使用来自ThreadPool的线程执行的。其他,例如异步IO操作,不需要线程。上传文件就是这样的IO操作。

您的lambda是异步的,并且已经返回任务,因此没有理由使用task.run。您可以为所有项目执行一次,收集列表中的任务并等待所有项目。这就是裸露的方式:

async Task<FinalWrapperObject> UploadItemAsync(BatchItem resultBatchItem) =>
{
    var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
    return new FinalWrapperObject {
        BlobContent = blobContent,
        CreationDateTime = resultBatchItem.CreationDateTime
    };
}

...

var tasks=resultsBatch.Select(UploadItemAsync);
var results=await Task.WhenAll(tasks);

使用TPL DataFlow

一个更好的选择是使用TPL DataFlow类并同时上传项目,甚至通过处理块中的管道构造管道。

var options= new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 10
         };
var results=new BufferBlock<FinalWrapperObject>();
var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,options);
uploader.LinkTo(results);

foreach(var item in fetchedObjects)
{
    uploader.PostAsync(item);
}
uploader.Complete();
await uploader.Completion;

默认情况下,一个块一次仅处理一条消息。使用MaxDegreeofParallelism = 10我们告诉它同时处理10个项目。只要有项目要发布到uploader块,此代码将同时上传10个项目。

结果将转发到结果 BufferBlock。这些项目可以使用tryReceiveAll

IList<FinalWrapperObject> items;
results.TryReceiveAll(out items);

数据流块可以合并到管道中。您可以拥有一个从磁盘加载项目的块,另一个可以上传它们,最后一个存储对另一个文件或数据库的响应的块:

var dop10= new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 10,
            BoundedCapacity=4
         };
var bounded= new ExecutionDataflowBlockOptions
         {
            BoundedCapacity=4
         };

var loader=new TransformBlock<FileInfo,BatchItem>(LoadFile,bounded);
var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,dop10);
var dbLogger=new ActionBlock<FinalWrapperObject>(bounded);

var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};
loader.LinkTo(uploader,linkOptions);
uploader.LinkTo(dbLogger,linkOptions);

var folder=new DirectoryInfo(rootPath);
foreach(var item in folder.EnumerateFiles())
{
    await loader.SendAsync(item);
}
loader.Complete();

await dbLogger.Completion;

在这种情况下,文件夹中的所有文件都发布到loader块一个一个一个一个一个a and Fornef的文件batchitemuploader上传文件,结果由dblogger存储。最后,我们告诉loader我们已经完成了,并等待所有项目都可以一直处理到等待dblogger.completion

boundedcapacity用于对每个块的输入缓冲区可以保存多少个项目。这样可以防止将所有文件加载到内存中。

Your code never starts any tasks. Tasks aren't threads anyway. They're a promise that something will complete and maybe produce a value in the future. Some tasks require a thread to run. These are executed using threads that come from a threadpool. Others, eg async IO operations, don't require a thread. Uploading a file is such an IO operation.

Your lambda is asynchronous and already returning a Task so there's no reason to use Task.Run. You can execute it once for all items, collect the Tasks in a list and await all of them. That's the bare-bones way :

async Task<FinalWrapperObject> UploadItemAsync(BatchItem resultBatchItem) =>
{
    var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
    return new FinalWrapperObject {
        BlobContent = blobContent,
        CreationDateTime = resultBatchItem.CreationDateTime
    };
}

...

var tasks=resultsBatch.Select(UploadItemAsync);
var results=await Task.WhenAll(tasks);

Using TPL Dataflow

A better option would be to use the TPL Dataflow classes to upload items concurrently and even construct a pipeline from processing blocks.

var options= new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 10
         };
var results=new BufferBlock<FinalWrapperObject>();
var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,options);
uploader.LinkTo(results);

foreach(var item in fetchedObjects)
{
    uploader.PostAsync(item);
}
uploader.Complete();
await uploader.Completion;

By default, a block only processes one message at a time. Using MaxDegreeOfParallelism = 10 we're telling it to process 10 items concurrently. This code will upload 10 items concurrently at a time, as long as there items to post to the uploader block.

The results are forwarded to the results BufferBlock. The items can be extracted with TryReceiveAll :

IList<FinalWrapperObject> items;
results.TryReceiveAll(out items);

Dataflow blocks can be combined into a pipeline. You could have a block that loads items from disk, another to upload them and a final one that stores the response to another file or database :

var dop10= new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 10,
            BoundedCapacity=4
         };
var bounded= new ExecutionDataflowBlockOptions
         {
            BoundedCapacity=4
         };

var loader=new TransformBlock<FileInfo,BatchItem>(LoadFile,bounded);
var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,dop10);
var dbLogger=new ActionBlock<FinalWrapperObject>(bounded);

var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};
loader.LinkTo(uploader,linkOptions);
uploader.LinkTo(dbLogger,linkOptions);

var folder=new DirectoryInfo(rootPath);
foreach(var item in folder.EnumerateFiles())
{
    await loader.SendAsync(item);
}
loader.Complete();

await dbLogger.Completion;

In this case, all files in a folder are posted to the loader block which loads files one by one and forwards a BatchItem. The uploader uploads the file and the results are stored by dbLogger. In the end, we tell loader we're finished and wait for all items to get processed all the way to the end with await dbLogger.Completion.

The BoundedCapacity is used to put a limit on how many items can be held at each block's input buffer. This prevents loading all files into memory.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文