混合 ExecutionContext.SuppressFlow 和任务时 AsyncLocal.Value 出现意外值

发布于 2025-01-12 17:04:19 字数 2717 浏览 3 评论 0 原文

在应用程序中,由于 AsyncLocal 的错误/意外值,我遇到了奇怪的行为:尽管我抑制了执行上下文的流程,但 AsyncLocal.Value 属性有时不会在新生成的任务的执行范围内重置。

下面我创建了一个最小的可重现示例来演示该问题:

private static readonly AsyncLocal<object> AsyncLocal = new AsyncLocal<object>();
[TestMethod]
public void Test()
{
    Trace.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
        {
            AsyncLocal.Value = "1";
            Task anotherTask;
            using (ExecutionContext.SuppressFlow())
            {
                anotherTask = Task.Run(() =>
                    {
                        Trace.WriteLine(AsyncLocal.Value); // "1" <- ???
                        Assert.IsNull(AsyncLocal.Value); // BOOM - FAILS
                        AsyncLocal.Value = "2";
                    });
            }

            Task.WaitAll(anotherTask);
        });

    mainTask.Wait(500000, CancellationToken.None);
}

在十分之九的运行中(在我的电脑上),测试方法的结果是:

.NET 6.0.2
"1"

->测试失败

正如您所看到的,测试失败是因为在 Task.Run 中执行的操作中,之前的值仍然存在于 AsyncLocal.Value 中(消息:<代码>1)。

我的具体问题是:

  1. 为什么会发生这种情况? 我怀疑发生这种情况是因为 Task.Run 可能使用当前线程来执行工作负载。在这种情况下,我假设缺少 async/await-operators 不会强制为该操作创建新的/单独的 ExecutionContext。就像 Stephen Cleary 所说的“从逻辑调用上下文的角度来看,所有同步调用都是“折叠” - 它们实际上是调用堆栈上最接近的异步方法上下文的一部分”。如果是这样的话,我确实理解为什么在操作中使用相同的上下文。

这是对此行为的正确解释吗?另外,为什么它有时可以完美地工作(在我的机器上大约有十分之一)?

  1. 如何解决这个问题? 假设我的上述理论是正确的,那么强制引入一个新的异步“层”就足够了,如下所示:
private static readonly AsyncLocal<object> AsyncLocal = new AsyncLocal<object>();
[TestMethod]
public void Test()
{
    Trace.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
        {
            AsyncLocal.Value = "1";
            Task anotherTask;
            using (ExecutionContext.SuppressFlow())
            {
                var wrapper = () =>
                    {
                        Trace.WriteLine(AsyncLocal.Value);
                        Assert.IsNull(AsyncLocal.Value); 
                        AsyncLocal.Value = "2";
                        return Task.CompletedTask;
                    };

                anotherTask = Task.Run(async () => await wrapper());
            }

            Task.WaitAll(anotherTask);
        });

    mainTask.Wait(500000, CancellationToken.None);
}

这似乎解决了问题(它在我的机器上始终有效),但我想确保这是正确的解决这个问题。

预先非常感谢

In an application I am experiencing odd behavior due to wrong/unexpected values of AsyncLocal: Despite I suppressed the flow of the execution context, I the AsyncLocal.Value-property is sometimes not reset within the execution scope of a newly spawned Task.

Below I created a minimal reproducible sample which demonstrates the problem:

private static readonly AsyncLocal<object> AsyncLocal = new AsyncLocal<object>();
[TestMethod]
public void Test()
{
    Trace.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
        {
            AsyncLocal.Value = "1";
            Task anotherTask;
            using (ExecutionContext.SuppressFlow())
            {
                anotherTask = Task.Run(() =>
                    {
                        Trace.WriteLine(AsyncLocal.Value); // "1" <- ???
                        Assert.IsNull(AsyncLocal.Value); // BOOM - FAILS
                        AsyncLocal.Value = "2";
                    });
            }

            Task.WaitAll(anotherTask);
        });

    mainTask.Wait(500000, CancellationToken.None);
}

In nine out of ten runs (on my pc) the outcome of the Test-method is:

.NET 6.0.2
"1"

-> The test fails

As you can see the test fails because within the action which is executed within Task.Run the the previous value is still present within AsyncLocal.Value (Message: 1).

My concrete questions are:

  1. Why does this happen?
    I suspect this happens because Task.Run may use the current thread to execute the work load. In that case, I assume lack of async/await-operators does not force the creation of a new/separate ExecutionContext for the action. Like Stephen Cleary said "from the logical call context’s perspective, all synchronous invocations are “collapsed” - they’re actually part of the context of the closest async method further up the call stack". If that’s the case I do understand why the same context is used within the action.

Is this the correct explanation for this behavior? In addition, why does it work flawlessly sometimes (about 1 run out of 10 on my machine)?

  1. How can I fix this?
    Assuming that my theory above is true it should be enough to forcefully introduce a new async "layer", like below:
private static readonly AsyncLocal<object> AsyncLocal = new AsyncLocal<object>();
[TestMethod]
public void Test()
{
    Trace.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
        {
            AsyncLocal.Value = "1";
            Task anotherTask;
            using (ExecutionContext.SuppressFlow())
            {
                var wrapper = () =>
                    {
                        Trace.WriteLine(AsyncLocal.Value);
                        Assert.IsNull(AsyncLocal.Value); 
                        AsyncLocal.Value = "2";
                        return Task.CompletedTask;
                    };

                anotherTask = Task.Run(async () => await wrapper());
            }

            Task.WaitAll(anotherTask);
        });

    mainTask.Wait(500000, CancellationToken.None);
}

This seems to fix the problem (it consistently works on my machine), but I want to be sure that this is a correct fix for this problem.

Many thanks in advance

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

岁月静好 2025-01-19 17:04:19

为什么会发生这种情况?我怀疑发生这种情况是因为 Task.Run 可能使用当前线程来执行工作负载。

我怀疑发生这种情况是因为 Task.WaitAll 将使用当前线程来内联执行任务。

具体来说,Task.WaitAll调用Task.WaitAllCore,这将 尝试通过调用 href="https://github.com/dotnet/runtime/blob/4017327955f1d8ddc43980eb1848c52fbb131dfc/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs#L2870" rel="noreferrer">Task.WrappedTryRunInline。我假设自始至终都使用默认的任务调度程序。在这种情况下,这将调用 TaskScheduler.TryRunInline,这将返回false 如果委托是已经调用了。因此,如果任务已经开始在线程池线程上运行,这将返回到 WaitAllCore,它只会执行正常的等待,并且您的代码将按如下方式工作预期(十分之 1)。

如果线程池线程尚未获取它(十分之九),则 TaskScheduler.TryRunInline 将调用 TaskScheduler.TryExecuteTaskInline,其默认实现将调用Task.ExecuteEntryUnsafe,它调用 Task.ExecuteWithThreadLocalTask.ExecuteWithThreadLocal如果捕获了 ExecutionContext,则应用该逻辑。假设没有捕获任何内容,则任务的委托只是 直接调用

所以,看起来每一步都是合乎逻辑的。从技术上讲,ExecutionContext.SuppressFlow 的意思是“不捕获ExecutionContext”,而这就是正在发生的情况。它并不意味着“清除 ExecutionContext”。有时,任务在线程池线程上运行(没有捕获的 ExecutionContext),而 WaitAll 将等待它完成。其他时候,任务将由 WaitAll 而不是线程池线程内联执行,在这种情况下,ExecutionContext 不会被清除(技术上也不会被捕获) 。

您可以通过捕获 wrapper 中的当前线程 ID 并将其与执行 Task.WaitAll 的线程 ID 进行比较来测试这一理论。我希望对于异步本地值(意外地)继承的运行来说,它们将是相同的线程,并且对于异步本地值按预期工作的运行来说,它们将是不同的线程。

Why does this happen? I suspect this happens because Task.Run may use the current thread to execute the work load.

I suspect that it happens because Task.WaitAll will use the current thread to execute the task inline.

Specifically, Task.WaitAll calls Task.WaitAllCore, which will attempt to run it inline by calling Task.WrappedTryRunInline. I'm going to assume the default task scheduler is used throughout. In that case, this will invoke TaskScheduler.TryRunInline, which will return false if the delegate is already invoked. So, if the task has already started running on a thread pool thread, this will return back to WaitAllCore, which will just do a normal wait, and your code will work as expected (1 out of 10).

If a thread pool thread hasn't picked it up yet (9 out of 10), then TaskScheduler.TryRunInline will call TaskScheduler.TryExecuteTaskInline, the default implementation of which will call Task.ExecuteEntryUnsafe, which calls Task.ExecuteWithThreadLocal. Task.ExecuteWithThreadLocal has logic for applying an ExecutionContext if one was captured. Assuming none was captured, the task's delegate is just invoked directly.

So, it seems like each step is behaving logically. Technically, what ExecutionContext.SuppressFlow means is "don't capture the ExecutionContext", and that is what is happening. It doesn't mean "clear the ExecutionContext". Sometimes the task is run on a thread pool thread (without the captured ExecutionContext), and WaitAll will just wait for it to complete. Other times the task will be executed inline by WaitAll instead of a thread pool thread, and in that case the ExecutionContext is not cleared (and technically isn't captured, either).

You can test this theory by capturing the current thread id within your wrapper and comparing it to the thread id doing the Task.WaitAll. I expect that they will be the same thread for the runs where the async local value is (unexpectedly) inherited, and they will be different threads for the runs where the async local value works as expected.

你如我软肋 2025-01-19 17:04:19

如果可以的话,我首先考虑是否可以用单个共享缓存替换线程特定的缓存。该应用程序可能早于诸如 ConcurrentDictionary 之类的有用类型。

如果无法使用单例缓存,则可以使用异步本地值的堆栈。堆叠异步本地值是一种常见模式。我更喜欢将堆栈逻辑包装成单独的类型(下面代码中的 AsyncLocalValue):

public sealed class AsyncLocalValue
{
    private static readonly AsyncLocal<ImmutableStack<object>> _asyncLocal = new();

    public object Value => _asyncLocal.Value?.Peek();

    public IDisposable PushValue(object value)
    {
        var originalValue = _asyncLocal.Value;
        var newValue = (originalValue ?? ImmutableStack<object>.Empty).Push(value);
        _asyncLocal.Value = newValue;
        return Disposable.Create(() => _asyncLocal.Value = originalValue);
    }
}

private static AsyncLocalValue AsyncLocal = new();

[TestMethod]
public void Test()
{
    Console.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
    {
        Task anotherTask;
        using (AsyncLocal.PushValue("1"))
        {
            using (AsyncLocal.PushValue(null))
            {
                anotherTask = Task.Run(() =>
                {
                    Console.WriteLine("Observed: " + AsyncLocal.Value);
                    using (AsyncLocal.PushValue("2"))
                    {
                    }
                });
            }
        }

        Task.WaitAll(anotherTask);
    });

    mainTask.Wait(500000, CancellationToken.None);
}

此代码示例使用我的 Nito.Disposables 库

If you can, I'd first consider whether it's possible to replace the thread-specific caches with a single shared cache. The app likely predates useful types such as ConcurrentDictionary.

If it isn't possible to use a singleton cache, then you can use a stack of async local values. Stacking async local values is a common pattern. I prefer wrapping the stack logic into a separate type (AsyncLocalValue in the code below):

public sealed class AsyncLocalValue
{
    private static readonly AsyncLocal<ImmutableStack<object>> _asyncLocal = new();

    public object Value => _asyncLocal.Value?.Peek();

    public IDisposable PushValue(object value)
    {
        var originalValue = _asyncLocal.Value;
        var newValue = (originalValue ?? ImmutableStack<object>.Empty).Push(value);
        _asyncLocal.Value = newValue;
        return Disposable.Create(() => _asyncLocal.Value = originalValue);
    }
}

private static AsyncLocalValue AsyncLocal = new();

[TestMethod]
public void Test()
{
    Console.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
    {
        Task anotherTask;
        using (AsyncLocal.PushValue("1"))
        {
            using (AsyncLocal.PushValue(null))
            {
                anotherTask = Task.Run(() =>
                {
                    Console.WriteLine("Observed: " + AsyncLocal.Value);
                    using (AsyncLocal.PushValue("2"))
                    {
                    }
                });
            }
        }

        Task.WaitAll(anotherTask);
    });

    mainTask.Wait(500000, CancellationToken.None);
}

This code sample uses Disposable.Create from my Nito.Disposables library.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文