我正在研究在 C# (.NET 4.0) 中使用 TPL。
我创建了一个自定义 API 来简化 Web 请求的创建和下载内容(异步,使用延续任务)。那部分工作正常。
当我尝试使用 LimitedConcurrencyLevelTaskScheduler
(可在 示例中找到)时,就会出现问题并行编程 以及 MSDN具有延迟任务创建的任务文档)。如果您不熟悉该类,它所做的只是将计划任务的并发度限制为任意数量。
基本上,我想将 Web 请求任务链的创建推迟到由 LimitedConcurrencyLevelTaskScheduler
调度的任务中,以便我可以限制并发下载的数量。
按照圣人 Stephen Toub 的建议,当推迟创建 Task
时,最好的做法是设计您的 API 以返回Func
或 Func>
。我已经做到了。
不幸的是,我的程序在安排第一组并发任务后挂起。假设我的任务并发度限制为 4 度。在这种情况下,将启动 4 个任务,然后程序将挂起。任务永远不会完成。
我创建了一个最小的例子来简单地说明问题。我使用文件读取而不是使用 WebRequest
。我已将并发度限制为 1。
class Program
{
static Func<Task> GetReadTask()
{
return () =>
{
Console.WriteLine("Opening file.");
FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);
byte[] buffer = new byte[32];
Console.WriteLine("Beginning read.");
return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
};
}
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(ts);
int[] range = {1, 2, 3};
var tasks = range.Select(number =>
{
Func<Task> getTask = GetReadTask();
return factory.StartNew(() =>
{
var task = getTask();
task.Wait();
});
});
Task.WaitAll(tasks.ToArray());
}
}
为了澄清我所说的“挂起”的含义,这就是输出的样子。
Opening file.
Beginning read.
然后就永远不会再打印任何其他内容。
关于发生什么事的任何线索?
I've my working on using the TPL in C# (.NET 4.0).
I have created a custom API to ease the creation of web requests and downloading the contents (asynchronously, using continuation tasks). That part is working fine.
The problem I have occurs when I try to use the LimitedConcurrencyLevelTaskScheduler
(found in the Samples for Parallel Programming and in the MSDN documentation for tasks) with deferred task creation. If you're not familiar with that class, all it does is limit the degree of concurrency of tasks scheduled to an arbitrary number.
Basically I want to defer the creation of web request task chains into a task being scheduled by the LimitedConcurrencyLevelTaskScheduler
so that I can limit the number of concurrent downloads.
As suggested by the sage Stephen Toub, when deferring the creation of a Task
, the best thing to do is to design your API to return a Func<Task>
or Func<Task<TResult>>
. I have done this.
Unfortunately, my program hangs after scheduling the first set of concurrent tasks. Say I have my tasks limited to 4 degrees of concurrency. In that case, 4 tasks would be started and then the program would hang. The tasks would never complete.
I have created a minimal example to illustrate the problem simply. I am using file reads instead of using a WebRequest
. I have limited the degrees of concurrency to 1.
class Program
{
static Func<Task> GetReadTask()
{
return () =>
{
Console.WriteLine("Opening file.");
FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);
byte[] buffer = new byte[32];
Console.WriteLine("Beginning read.");
return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
};
}
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(ts);
int[] range = {1, 2, 3};
var tasks = range.Select(number =>
{
Func<Task> getTask = GetReadTask();
return factory.StartNew(() =>
{
var task = getTask();
task.Wait();
});
});
Task.WaitAll(tasks.ToArray());
}
}
To clarify what I mean by "it hangs", this is what the output looks like.
Opening file.
Beginning read.
And then nothing else is printed... forever.
Any clues on what is going on?
发布评论
评论(1)
好问题!
首先,我不确定
LimitedConcurrencyLevelTaskScheduler
是学术上正确的解决方案。为了将并发请求数限制为 N,您必须阻止 N 个任务,这从一开始就违背了使用 APM 异步调用的目的。话虽如此,它比替代方案更容易实施。您需要有一个工作队列并记录正在进行的请求数量,然后根据需要创建工作任务。要做到这一点并不简单,如果并发请求的数量 N 很小,那么拥有 N 个阻塞线程并不是世界末日。
因此,您的代码的问题在于,在其他任务中创建的任务使用父任务中的调度程序。实际上,对于使用
FromAsync
创建的任务来说并非如此,因为这些任务使用底层 APM 实现,因此有些不同。您可以使用
Main
创建任务:factory
使用LimitedConcurrencyLevelTaskScheduler( 1 )
,因此这些任务中只有 1 个可以并发执行,并且其中一个正在等待从getTask()
返回的任务上。因此,在
GetReadTask
中,您调用Task.Factory.FromAsync
。这是因为FromAsync
不尊重父任务的调度程序。然后使用
.ContinueWith(task => fileStream.Close())
创建延续。这将创建一个尊重其父级调度程序的任务。由于LimitedConcurrencyLevelTaskScheduler
已经在执行一项任务(Main
中被阻止的任务),因此延续无法运行,并且出现死锁。解决方案是使用
TaskScheduler.Default
在普通线程池线程上运行延续。然后它开始运行,僵局被打破。这是我的解决方案:
Main
现在看起来像这样:有几件事需要注意:
将
task.Wait()
移动到QueueReadTask
更明显地表明您正在阻止任务。您可以删除FromAsync
调用和延续,并将它们替换为正常的同步调用,因为无论如何您都会阻塞。从 QueueReadTask 返回的任务可以有延续。默认情况下,它们在默认调度程序下运行,因为它们继承父任务的调度程序而不是前件任务的调度程序。在这种情况下,没有父任务,因此使用默认调度程序。
Good question!
Firstly, I'm not sure
LimitedConcurrencyLevelTaskScheduler
is the academically correct solution. In order for this to limit the number of concurrent requests to N, you have to block N tasks which kind of defeats the purpose of using APM async calls in the first place.Having said that, it is a whole lot easier to implement than the alternative. You would need to have a work queue and keep count of the number of in flight requests, then create worker tasks as required. That's not trivial to get right and if the number N of concurrent requests will be small, having N blocked threads is not the end of the world.
So, the problem with your code is that tasks created within other tasks use the scheduler from the parent task. Actually that's not true for tasks created with
FromAsync
as these use the underlying APM implementation and so are a bit different.You create tasks in
Main
with:factory
uses theLimitedConcurrencyLevelTaskScheduler( 1 )
, so only 1 of these tasks can execute concurrently and that one is waiting on the task returned fromgetTask()
.So, in
GetReadTask
you callTask<int>.Factory.FromAsync
. This runs becauseFromAsync
doesn't respect the parent task's scheduler.Then you create a continuation with
.ContinueWith(task => fileStream.Close())
. This creates a task that does respect its parent's scheduler. Since theLimitedConcurrencyLevelTaskScheduler
is already executing a task ( the one inMain
that's blocked ) the continuation cannot run and you have a deadlock.The solution is to run the continuation on a normal thread pool thread with
TaskScheduler.Default
. It then gets to run and the deadlock is broken.Here's my solution:
And
Main
now looks like this:There are a couple of things to note:
Moving the
task.Wait()
intoQueueReadTask
makes it more obvious that you are blocking a task. You can remove theFromAsync
call and the continuation and replace them with a normal synchronous call since you are blocking anyway.The task returned from
QueueReadTask
can have continuations. By default, these run under the default scheduler because they inherit the parent task's scheduler not the antecedent's one. In this case there is no parent task, so the default scheduler is used.