使用 LimitedConcurrencyLevelTask​​Scheduler 时延续任务挂起

发布于 2024-12-18 09:10:28 字数 2182 浏览 0 评论 0 原文

我正在研究在 C# (.NET 4.0) 中使用 TPL。

我创建了一个自定义 API 来简化 Web 请求的创建和下载内容(异步,使用延续任务)。那部分工作正常。

当我尝试使用 LimitedConcurrencyLevelTask​​Scheduler (可在 示例中找到)时,就会出现问题并行编程 以及 MSDN具有延迟任务创建的任务文档)。如果您不熟悉该类,它所做的只是将计划任务的并发度限制为任意数量。

基本上,我想将 Web 请求任务链的创建推迟到由 LimitedConcurrencyLevelTask​​Scheduler 调度的任务中,以便我可以限制并发下载的数量。

按照圣人 Stephen Toub 的建议,当推迟创建 Task 时,最好的做法是设计您的 API 以返回FuncFunc>。我已经做到了。

不幸的是,我的程序在安排第一组并发任务后挂起。假设我的任务并发度限制为 4 度。在这种情况下,将启动 4 个任务,然后程序将挂起。任务永远不会完成。

我创建了一个最小的例子来简单地说明问题。我使用文件读取而不是使用 WebRequest。我已将并发度限制为 1。

class Program
{
    static Func<Task> GetReadTask()
    {
        return () =>
        {
            Console.WriteLine("Opening file.");

            FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);

            byte[] buffer = new byte[32];

            Console.WriteLine("Beginning read.");
            return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
        };
    }

    static void Main()
    {
        LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
        TaskFactory factory = new TaskFactory(ts);

        int[] range = {1, 2, 3};

        var tasks = range.Select(number =>
        {
            Func<Task> getTask = GetReadTask();
            return factory.StartNew(() =>
            {
                var task = getTask();
                task.Wait();
            });
        });

        Task.WaitAll(tasks.ToArray());
    }
}

为了澄清我所说的“挂起”的含义,这就是输出的样子。

Opening file.
Beginning read.

然后就永远不会再打印任何其他内容。

关于发生什么事的任何线索?

I've my working on using the TPL in C# (.NET 4.0).

I have created a custom API to ease the creation of web requests and downloading the contents (asynchronously, using continuation tasks). That part is working fine.

The problem I have occurs when I try to use the LimitedConcurrencyLevelTaskScheduler (found in the Samples for Parallel Programming and in the MSDN documentation for tasks) with deferred task creation. If you're not familiar with that class, all it does is limit the degree of concurrency of tasks scheduled to an arbitrary number.

Basically I want to defer the creation of web request task chains into a task being scheduled by the LimitedConcurrencyLevelTaskScheduler so that I can limit the number of concurrent downloads.

As suggested by the sage Stephen Toub, when deferring the creation of a Task, the best thing to do is to design your API to return a Func<Task> or Func<Task<TResult>>. I have done this.

Unfortunately, my program hangs after scheduling the first set of concurrent tasks. Say I have my tasks limited to 4 degrees of concurrency. In that case, 4 tasks would be started and then the program would hang. The tasks would never complete.

I have created a minimal example to illustrate the problem simply. I am using file reads instead of using a WebRequest. I have limited the degrees of concurrency to 1.

class Program
{
    static Func<Task> GetReadTask()
    {
        return () =>
        {
            Console.WriteLine("Opening file.");

            FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);

            byte[] buffer = new byte[32];

            Console.WriteLine("Beginning read.");
            return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
        };
    }

    static void Main()
    {
        LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
        TaskFactory factory = new TaskFactory(ts);

        int[] range = {1, 2, 3};

        var tasks = range.Select(number =>
        {
            Func<Task> getTask = GetReadTask();
            return factory.StartNew(() =>
            {
                var task = getTask();
                task.Wait();
            });
        });

        Task.WaitAll(tasks.ToArray());
    }
}

To clarify what I mean by "it hangs", this is what the output looks like.

Opening file.
Beginning read.

And then nothing else is printed... forever.

Any clues on what is going on?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

淡淡離愁欲言轉身 2024-12-25 09:10:28

好问题!

首先,我不确定 LimitedConcurrencyLevelTask​​Scheduler 是学术上正确的解决方案。为了将并发请求数限制为 N,您必须阻止 N 个任务,这从一开始就违背了使用 APM 异步调用的目的。

话虽如此,它比替代方案更容易实施。您需要有一个工作队列并记录正在进行的请求数量,然后根据需要创建工作任务。要做到这一点并不简单,如果并发请求的数量 N 很小,那么拥有 N 个阻塞线程并不是世界末日。

因此,您的代码的问题在于,在其他任务中创建的任务使用父任务中的调度程序。实际上,对于使用 FromAsync 创建的任务来说并非如此,因为这些任务使用底层 APM 实现,因此有些不同。

您可以使用 Main 创建任务:

return factory.StartNew( () =>
    {
        var task = getTask();
        task.Wait();
    }
);

factory 使用 LimitedConcurrencyLevelTask​​Scheduler( 1 ),因此这些任务中只有 1 个可以并发执行,并且其中一个正在等待从 getTask() 返回的任务上。

因此,在 GetReadTask 中,您调用 Task.Factory.FromAsync。这是因为 FromAsync 不尊重父任务的调度程序。

然后使用 .ContinueWith(task => fileStream.Close()) 创建延续。这将创建一个尊重其父级调度程序的任务。由于 LimitedConcurrencyLevelTask​​Scheduler 已经在执行一项任务(Main 中被阻止的任务),因此延续无法运行,并且出现死锁。

解决方案是使用 TaskScheduler.Default 在普通线程池线程上运行延续。然后它开始运行,僵局被打破。

这是我的解决方案:

static Task QueueReadTask( TaskScheduler ts, int number )
{
    Output.Write( "QueueReadTask( " + number + " )" );

    return Task.Factory.StartNew( () =>
        {
            Output.Write( "Opening file " + number + "." );

            FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );

            byte[] buffer = new byte[ 32 ];

            var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );

            var tClose = tRead.ContinueWith( task =>
                    {
                        Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
                        fileStream.Close();
                    }
                    , TaskScheduler.Default
                );

            tClose.Wait();
        }
        , CancellationToken.None
        , TaskCreationOptions.None
        , ts
    );
}

Main 现在看起来像这样:

static void Main()
{
    LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );

    int[] range = { 1, 2, 3 };

    var tasks = range.Select( number =>
        {
            var task = QueueReadTask( ts, number );

            return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
        }
    )
    .ToArray();

    Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );

    Task.WaitAll( tasks );

    Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}

有几件事需要注意:

task.Wait() 移动到 QueueReadTask更明显地表明您正在阻止任务。您可以删除 FromAsync 调用和延续,并将它们替换为正常的同步调用,因为无论如何您都会阻塞。

从 QueueReadTask 返回的任务可以有延续。默认情况下,它们在默认调度程序下运行,因为它们继承父任务的调度程序而不是前件任务的调度程序。在这种情况下,没有父任务,因此使用默认调度程序。

Good question!

Firstly, I'm not sure LimitedConcurrencyLevelTaskScheduler is the academically correct solution. In order for this to limit the number of concurrent requests to N, you have to block N tasks which kind of defeats the purpose of using APM async calls in the first place.

Having said that, it is a whole lot easier to implement than the alternative. You would need to have a work queue and keep count of the number of in flight requests, then create worker tasks as required. That's not trivial to get right and if the number N of concurrent requests will be small, having N blocked threads is not the end of the world.

So, the problem with your code is that tasks created within other tasks use the scheduler from the parent task. Actually that's not true for tasks created with FromAsync as these use the underlying APM implementation and so are a bit different.

You create tasks in Main with:

return factory.StartNew( () =>
    {
        var task = getTask();
        task.Wait();
    }
);

factory uses the LimitedConcurrencyLevelTaskScheduler( 1 ), so only 1 of these tasks can execute concurrently and that one is waiting on the task returned from getTask().

So, in GetReadTask you call Task<int>.Factory.FromAsync. This runs because FromAsync doesn't respect the parent task's scheduler.

Then you create a continuation with .ContinueWith(task => fileStream.Close()). This creates a task that does respect its parent's scheduler. Since the LimitedConcurrencyLevelTaskScheduler is already executing a task ( the one in Main that's blocked ) the continuation cannot run and you have a deadlock.

The solution is to run the continuation on a normal thread pool thread with TaskScheduler.Default. It then gets to run and the deadlock is broken.

Here's my solution:

static Task QueueReadTask( TaskScheduler ts, int number )
{
    Output.Write( "QueueReadTask( " + number + " )" );

    return Task.Factory.StartNew( () =>
        {
            Output.Write( "Opening file " + number + "." );

            FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );

            byte[] buffer = new byte[ 32 ];

            var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );

            var tClose = tRead.ContinueWith( task =>
                    {
                        Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
                        fileStream.Close();
                    }
                    , TaskScheduler.Default
                );

            tClose.Wait();
        }
        , CancellationToken.None
        , TaskCreationOptions.None
        , ts
    );
}

And Main now looks like this:

static void Main()
{
    LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );

    int[] range = { 1, 2, 3 };

    var tasks = range.Select( number =>
        {
            var task = QueueReadTask( ts, number );

            return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
        }
    )
    .ToArray();

    Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );

    Task.WaitAll( tasks );

    Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}

There are a couple of things to note:

Moving the task.Wait() into QueueReadTask makes it more obvious that you are blocking a task. You can remove the FromAsync call and the continuation and replace them with a normal synchronous call since you are blocking anyway.

The task returned from QueueReadTask can have continuations. By default, these run under the default scheduler because they inherit the parent task's scheduler not the antecedent's one. In this case there is no parent task, so the default scheduler is used.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文