CorrelationManager.LogicalOperationStack 是否与 Parallel.For、任务、线程等兼容

发布于 2024-10-12 09:03:38 字数 9845 浏览 9 评论 0 原文

请参阅此问题以获取背景信息:

How do Tasks in任务并行库影响 ActivityID?

该问题询问任务如何影响 Trace.CorrelationManager.ActivityId。 @Greg Samson 用一个测试程序回答了他自己的问题,表明 ActivityId 在任务上下文中是可靠的。测试程序在Task委托的开头设置一个ActivityId,休眠以模拟工作,然后在最后检查该ActivityId以确保它是相同的值(即它没有被另一个线程修改)。程序运行成功。

在研究线程、任务和并行操作的其他“上下文”选项(最终为日志记录提供更好的上下文)时,我遇到了一个奇怪的问题 Trace.CorrelationManager.LogicalOperationStack (无论如何,这对我来说很奇怪)。我在下面复制了他的问题的“答案”。

我认为它充分描述了我遇到的问题(Trace.CorrelationManager.LogicalOperationStack 显然在 Parallel.For 的上下文中使用时被损坏 - 或者其他东西,但前提是 Parallel.For 本身包含在逻辑操作中) 。

这是我的问题:

  1. Trace.CorrelationManager.LogicalOperationStack 是否可以与 Parallel.For 一起使用?如果是这样,如果启动 Parallel.For 时逻辑操作已经生效,是否会产生影响?

  2. 是否有“正确”的方法将 LogicalOperationStack 与 Parallel.For 一起使用?我可以对这个示例程序进行不同的编码以使其“有效”吗?我所说的“有效”是指 LogicalOperationStack 始终具有预期的条目数,并且条目本身就是预期的条目。

我已经使用线程和线程池线程进行了一些额外的测试,但我必须返回并重试这些测试,看看是否遇到类似的问题。

我想说的是,任务/并行线程和 ThreadPool 线程确实从父线程“继承”了 Trace.CorrelationManager.ActivityId 和 Trace.CorrelationManager.LogicalOperationStack 值。这是预期的,因为 CorrelationManager 使用 CallContext 的 LogicalSetData 方法(与 SetData 相对)。

再次,请参阅此问题以获取我在下面发布的“答案”的原始上下文:

任务并行库中的任务如何影响 ActivityID?

另请参阅 Microsoft 并行扩展论坛上的类似问题(迄今为止尚未得到解答):

http://social.msdn.microsoft。 com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[开始粘贴]

请原谅我将此作为答案发布,因为它不是真正的答案但是,对于您的问题,它与您的问题相关,因为它涉及 CorrelationManager 行为和线程/任务/等。我一直在考虑使用 CorrelationManager 的 LogicalOperationStack(和 StartLogicalOperation/StopLogicalOperation 方法)在多线程场景中提供额外的上下文。

我采用了您的示例并对其进行了稍微修改,以添加使用 Parallel.For 并行执行工作的能力。另外,我使用 StartLogicalOperation/StopLogicalOperation 来括起(内部)DoLongRunningWork。从概念上讲,DoLongRunningWork 每次执行时都会执行类似的操作:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

我发现,如果我将这些逻辑操作添加到代码中(或多或少按原样),所有逻辑操作都会保持同步(堆栈上的操作数始终是预期的,并且堆栈上操作的值始终符合预期)。

在我自己的一些测试中,我发现情况并非总是如此。逻辑操作堆栈正在“损坏”。我能想到的最好的解释是,当“子”线程退出时,将 CallContext 信息“合并”回“父”线程上下文会导致“旧”子线程上下文信息(逻辑操作)变为“由另一个兄弟子线程继承”。

该问题也可能与以下事实有关:Parallel.For 显然使用主线程(至少在示例代码中,如所写)作为“工作线程”之一(或在并行域中应调用它们的任何线程)。每当执行 DoLongRunningWork 时,都会启动一个新的逻辑操作(在开始时)并停止(在结束时)(即,推入 LogicalOperationStack 并从其中弹出)。如果主线程已经有一个有效的逻辑操作,并且 DoLongRunningWork 在主线程上执行,则将启动一个新的逻辑操作,因此主线程的 LogicalOperationStack 现在有两个操作。 DoLongRunningWork 的任何后续执行(只要 DoLongRunningWork 的这一“迭代”在主线程上执行)将(显然)继承主线程的 LogicalOperationStack(现在它有两个操作,而不仅仅是一个预期操作)。

我花了很长时间才弄清楚为什么 LogicalOperationStack 的行为在我的示例中与我的示例的修改版本中不同。最后我发现在我的代码中我已经将整个程序括在逻辑运算中,而在我的测试程序的修改版本中我没有。这意味着在我的测试程序中,每次执行我的“工作”(类似于DoLongRunningWork)时,已经有一个有效的逻辑操作。在我对测试程序的修改版本中,我没有将整个程序括在逻辑运算中。

因此,当我修改您的测试程序以将整个程序括在逻辑运算中并且如果我使用 Parallel.For 时,我遇到了完全相同的问题。

使用上面的概念模型,这将成功运行:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

虽然这最终会由于明显不同步的 LogicalOperationStack 而断言:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

这是我的示例程序。它与您的类似,因为它有一个操作 ActivityId 和 LogicalOperationStack 的 DoLongRunningWork 方法。我对 DoLongRunningWork 的踢法也有两种风格。一种使用任务,一种使用 Parallel.For。还可以执行每种风格,使得整个并行操作包含在逻辑操作中或不包含在逻辑操作中。因此,共有4种并行操作的执行方式。要尝试每一种方法,只需取消注释所需的“Use...”方法,重新编译并运行即可。 UseTasksUseTasks(true)UseParallelFor 应该全部运行完成。 UseParallelFor(true) 将在某个时刻断言,因为 LogicalOperationStack 没有预期的条目数。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

LogicalOperationStack 是否可以与 Parallel.For (和/或其他线程/任务构造)一起使用或如何使用它的整个问题可能值得它自己的问题。也许我会发布一个问题。同时,我想知道您对此是否有任何想法(或者,我想知道您是否考虑过使用 LogicalOperationStack,因为 ActivityId 似乎是安全的)。

[END PASTE]

有人对这个问题有什么想法吗?

Please see this question for background information:

How do Tasks in the Task Parallel Library affect ActivityID?

That question asks how Tasks affect Trace.CorrelationManager.ActivityId. @Greg Samson answered his own question with a test program showing that ActivityId is reliable in the context of Tasks. The test program sets an ActivityId at the beginning of the Task delegate, sleeps to simulate work, then checks the ActivityId at the end to make sure that it is the same value (i.e. that it has not been modified by another thread). The program runs successfully.

While researching other "context" options for threading, Tasks, and Parallel operations (ultimately to provide better context for logging), I ran into a strange issue with Trace.CorrelationManager.LogicalOperationStack (it was strange to me anyway). I have copied my "answer" to his question below.

I think that it adequately describes the issue that I ran into (Trace.CorrelationManager.LogicalOperationStack apparently getting corrupted - or something - when used in the context of Parallel.For, but only if the Parallel.For itself is enclosed in a logical operation).

Here are my questions:

  1. Should Trace.CorrelationManager.LogicalOperationStack be usable with Parallel.For? If so, should it make a difference if a logical operation is already in effect with the Parallel.For is started?

  2. Is there a "correct" way to use LogicalOperationStack with Parallel.For? Could I code this sample program differntly so that it "works"? By "works", I mean that the LogicalOperationStack always has the expected number of entries and the entries themselves are the expected entries.

I have done some additional testing using Threads and ThreadPool threads, but I would have to go back and retry those tests to see if I ran into similar problems.

I will say that it does appear that Task/Parallel threads and ThreadPool threads DO "inherit" the Trace.CorrelationManager.ActivityId and Trace.CorrelationManager.LogicalOperationStack values from the parent thread. This is expected as these values are stored by the CorrelationManager using CallContext's LogicalSetData method (as opposed to SetData).

Again, please refer back to this question to get the original context for the "answer" that I posted below:

How do Tasks in the Task Parallel Library affect ActivityID?

See also this similar question (which so far has not been answered) on Microsoft's Parallel Extensions forum:

http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[BEGIN PASTE]

Please forgive my posting this as an answer as it is not really answer to your question, however, it is related to your question since it deals with CorrelationManager behavior and threads/tasks/etc. I have been looking at using the CorrelationManager's LogicalOperationStack (and StartLogicalOperation/StopLogicalOperation methods) to provide additional context in multithreading scenarios.

I took your example and modified it slightly to add the ability to perform work in parallel using Parallel.For. Also, I use StartLogicalOperation/StopLogicalOperation to bracket (internally) DoLongRunningWork. Conceptually, DoLongRunningWork does something like this each time it is executed:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

I have found that if I add these logical operations to your code (more or less as is), all of the logical operatins remain in sync (always the expected number of operations on stack and the values of the operations on the stack are always as expected).

In some of my own testing I found that this was not always the case. The logical operation stack was getting "corrupted". The best explanation I could come up with is that the "merging" back of the CallContext information into the "parent" thread context when the "child" thread exits was causing the "old" child thread context information (logical operation) to be "inherited" by another sibling child thread.

The problem might also be related to the fact that Parallel.For apparently uses the main thread (at least in the example code, as written) as one of the "worker threads" (or whatever they should be called in the parallel domain). Whenever DoLongRunningWork is executed, a new logical operation is started (at the beginning) and stopped (at the end) (that is, pushed onto the LogicalOperationStack and popped back off of it). If the main thread already has a logical operation in effect and if DoLongRunningWork executes ON THE MAIN THREAD, then a new logical operation is started so the main thread's LogicalOperationStack now has TWO operations. Any subsequent executions of DoLongRunningWork (as long as this "iteration" of DoLongRunningWork is executing on the main thread) will (apparently) inherit the main thread's LogicalOperationStack (which now has two operations on it, rather than just the one expected operation).

It took me a long time to figure out why the behavior of the LogicalOperationStack was different in my example than in my modified version of your example. Finally I saw that in my code I had bracketed the entire program in a logical operation, whereas in my modified version of your test program I did not. The implication is that in my test program, each time my "work" was performed (analogous to DoLongRunningWork), there was already a logical operation in effect. In my modified version of your test program, I had not bracketed the entire program in a logical operation.

So, when I modified your test program to bracket the entire program in a logical operation AND if I am using Parallel.For, I ran into exactly the same problem.

Using the conceptual model above, this will run successfully:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

While this will eventually assert due to an apparently out of sync LogicalOperationStack:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

Here is my sample program. It is similar to yours in that it has a DoLongRunningWork method that manipulates the ActivityId as well as the LogicalOperationStack. I also have two flavors of kicking of DoLongRunningWork. One flavor uses Tasks one uses Parallel.For. Each flavor can also be executed such that the whole parallelized operation is enclosed in a logical operation or not. So, there are a total of 4 ways to execute the parallel operation. To try each one, simply uncomment the desired "Use..." method, recompile, and run. UseTasks, UseTasks(true), and UseParallelFor should all run to completion. UseParallelFor(true) will assert at some point because the LogicalOperationStack does not have the expected number of entries.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

This whole issue of if LogicalOperationStack can be used with Parallel.For (and/or other threading/Task constructs) or how it can be used probably merits its own question. Maybe I will post a question. In the meantime, I wonder if you have any thoughts on this (or, I wonder if you had considered using LogicalOperationStack since ActivityId appears to be safe).

[END PASTE]

Does anyone have any thoughts on this issue?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

安静 2024-10-19 09:03:39

[开始更新]

我还在 上提出了这个问题Microsoft 的 Parallel Extensions for .Net 支持论坛,并最终收到了 来自 Stephen Toub 的回答。事实证明,有一个LogicalCallContext 中的错误导致 LogicalOperationStack 损坏。还有一个很好的描述(在 Stephen 对我对他的答案所做的回复的后续内容中),它简要概述了 Parallel.For 如何在分配任务方面发挥作用,以及为什么这使得 Parallel.For 容易受到该错误的影响。

在下面的回答中,我推测 LogicalOperationStack 与 Parallel.For 不兼容,因为 Parallel.For 使用主线程作为“工作”线程之一。根据斯蒂芬的解释,我的猜测是错误的。 Parallel.For 确实使用主线程作为“工作”线程之一,但它并不是简单地“按原样”使用。第一个任务在主线程上运行,但运行方式就像在新线程上运行一样。阅读斯蒂芬的描述以获取更多信息。

[结束更新]

据我所知,答案如下:

ActivityId 和 LogicalOperationStack 均通过 CallContext.LogicalSetData。这意味着这些值将“流向”任何“子”线程。这非常酷,例如,您可以在多线程服务器的入口点设置 ActivityId(例如服务调用),并且最终从该入口点启动的所有线程都可以是同一“活动”的一部分。类似地,逻辑操作(通过 LogicalOperationStack)也流向子线程。

关于 Trace.CorrelationManager.ActivityId:

ActivityId 似乎与我测试过的所有线程模型兼容:直接使用线程、使用 ThreadPool、使用任务、使用 Parallel.*。在所有情况下,ActivityId 都具有预期值。

关于 Trace.CorrelationManager.LogicalOperationStack:

LogicalOperationStack 似乎与大多数线程模型兼容,但与 Parallel.* 不兼容。直接使用线程、ThreadPool 和任务,LogicalOperationStack(如我的问题中提供的示例代码中所操作的)保持其完整性。 LogicalOperationStack 的内容始终如预期。

LogicalOperationStack 与 Parallel.For 不兼容。如果逻辑操作“有效”,也就是说,如果您在启动 Parallel.* 操作之前调用了 CorrelationManager.StartLogicalOperation,然后在 Parallel.* 的上下文中(即在委托中)启动新的逻辑操作,那么 LogicalOperationStack 将被损坏。 (我应该说它可能会被损坏。Parallel.* 可能不会创建任何额外的线程,这意味着 LogicalOperationStack 是安全的)。

该问题源于以下事实:Parallel.* 使用主线程(或者,可能更正确地,启动并行操作的线程)作为其“工作”线程之一。这意味着当“逻辑操作”在与“主”线程相同的“工作”线程中启动和停止时,“主”线程的 LogicalOperationStack 正在被修改。即使调用代码(即委托)正确维护堆栈(确保使用相应的 StopLogicalOperation“停止”每个 StartLogicalOperation),“主”线程堆栈也会被修改。最终(无论如何,对我来说),“主”线程的 LogicalOperationStack 本质上是由两个不同的“逻辑”线程修改的:“主”线程和“工作”线程,它们恰好是相同的线。

我不知道为什么这不起作用的深层细节(至少正如我所期望的那样)。我最好的猜测是,每次在线程(与主线程不同)上执行委托时,该线程都会“继承”主线程 LogicalOperationStack 的当前状态。如果委托当前正在主线程上执行(被重用为工作线程),并且已启动逻辑操作,则一个(或多个)其他并行委托将“继承”主线程的 LogicalOperationStack,该委托现在拥有一个(或多个)新逻辑运算生效!

FWIW,我实现了(主要是为了测试,目前我实际上并没有使用它),以下“逻辑堆栈”来模仿 LogicalOperationStack,但以使其可以与 Parallel 一起工作的方式进行。*请随意尝试它出来和/或使用它。 调用替换

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation

要进行测试,请将原始问题中的示例代码中的

LogicalOperation.OperationStack.Push()/Pop().


//OperationStack.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Runtime.Remoting.Messaging;

namespace LogicalOperation
{
  public static class OperationStack
  {
    private const string OperationStackSlot = "OperationStackSlot";

    public static IDisposable Push(string operation)
    {
      OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      OperationStackItem op = new OperationStackItem(parent, operation);
      CallContext.LogicalSetData(OperationStackSlot, op);
      return op;
    }

    public static object Pop()
    {
      OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;

      if (current != null)
      {
        CallContext.LogicalSetData(OperationStackSlot, current.Parent);
        return current.Operation;
      }
      else
      {
        CallContext.FreeNamedDataSlot(OperationStackSlot);
      }
      return null;
    }

    public static object Peek()
    {
      OperationStackItem top = Top();
      return top != null ? top.Operation : null;
    }

    internal static OperationStackItem Top()
    {
      OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      return top;
    }

    public static IEnumerable<object> Operations()
    {
      OperationStackItem current = Top();
      while (current != null)
      {
        yield return current.Operation;
        current = current.Parent;
      }
    }

    public static int Count
    {
      get
      {
        OperationStackItem top = Top();
        return top == null ? 0 : top.Depth;
      }
    }

    public static IEnumerable<string> OperationStrings()
    {
      foreach (object o in Operations())
      {
        yield return o.ToString();
      }
    }
  }
}


//OperationStackItem.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace LogicalOperation
{
  public class OperationStackItem : IDisposable
  {
    private OperationStackItem parent = null;
    private object operation;
    private int depth;
    private bool disposed = false;

    internal OperationStackItem(OperationStackItem parentOperation, object operation)
    {
      parent = parentOperation;
      this.operation = operation;
      depth = parent == null ? 1 : parent.Depth + 1;
    }

    internal object Operation { get { return operation; } }
    internal int Depth { get { return depth; } }

    internal OperationStackItem Parent { get { return parent; } }

    public override string ToString()
    {
      return operation != null ? operation.ToString() : "";
    }

    #region IDisposable Members

    public void Dispose()
    {
      if (disposed) return;

      OperationStack.Pop();

      disposed = true;
    }

    #endregion
  }
}

为对的调用。这受到 Brent VanderMeide 此处描述的范围对象的启发:http://www.dnrtv.com/default.aspx?showNum=114

您可以像这样使用此类:

public void MyFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFunc"))
  {
    MyOtherFunc();
  }
}

public void MyOtherFunc()
{
  using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
  {
    MyFinalFunc();
  }
}

public void MyFinalFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
  {
    Console.WriteLine("Hello");
  }
}

[Begin Update]

I also asked this question on Microsoft's Parallel Extensions for .Net support forum and eventually received an answer from Stephen Toub. It turns out there there is a bug in the LogicalCallContext that is causing the LogicalOperationStack to be corrupted. There is also a nice description (in a followup by Stephen to a reply that I made to his answer) that gives a brief overiew of how Parallel.For works regarding doling out Tasks and why that makes Parallel.For susceptible to the bug.

In my answer below I speculate that LogicalOperationStack is not compatible with Parallel.For because Parallel.For uses the main thread as one of the "worker" threads. Based on Stephen's explanation, my speculation was incorrect. Parallel.For does use the main thread as one of the "worker" threads, but it is not simply used "as is". The first Task is run on the main thread, but is run in such a way that it is as if it is run on a new thread. Read Stephen's description for more info.

[End Update]

From what I can tell, the answer is as follows:

Both ActivityId and LogicalOperationStack are stored via CallContext.LogicalSetData. That means that these values will be "flowed" to any "child" threads. That is pretty cool as you could, for example, set ActivityId at the entry point into a multithreaded server (say a service call) and all threads that are ultimately started from that entry point can be part of the same "activity". Similarly, logical operations (via the LogicalOperationStack) also flow to the child threads.

With regards to Trace.CorrelationManager.ActivityId:

ActivityId seems to be compatible with all threading models that I have tested it with: Using threads directly, using ThreadPool, using Tasks, using Parallel.*. In all cases, ActivityId has the expected value.

With regards to Trace.CorrelationManager.LogicalOperationStack:

LogicalOperationStack seems to be compatible with most threading models, but NOT with Parallel.*. Using threads directly, ThreadPool, and Tasks, the LogicalOperationStack (as manipulated in the sample code provided in my question) maintains its integrity. At all times the contents of the LogicalOperationStack is as expected.

LogicalOperationStack is NOT compatible with Parallel.For. If a logical operation is "in effect", that is if you have called CorrelationManager.StartLogicalOperation, prior to starting the Parallel.* operation and then you start a new logical operation in the context of the Paralle.* (i.e. in the delegate), then the LogicalOperationStack WILL be corrupted. (I should say that it will PROBABLY be corrupted. Parallel.* might not create any additional threads, which means that the LogicalOperationStack would be safe).

The problem stems from the fact that Parallel.* uses the main thread (or, probably more correctly, the thread that starts the parallel operation) as one of its "worker" threads. That means that as "logical operations" are started and stopped in the "worker" thread that is the same as the "main" thread, the "main" thread's LogicalOperationStack is being modified. Even if the calling code (i.e. the delegate) maintains the stack correctly (ensuring that each StartLogicalOperation is "stopped" with a corresponding StopLogicalOperation), the "main" threads stack is modified. Ultimately it seems (to me, anyway), that the LogicalOperationStack of the "main" thread is essentially being modified by two different "logical" threads: the "main" thread and a "worker" thread, which both happen to be the SAME thread.

I don't know the deep down specifics of exactly why this is not working (at least as I would expect it work). My best guess is that each time the delegate is executed on a thread (that is not the same as the main thread), the thread "inherits" the current state of the main thread's LogicalOperationStack. If the delegate is currently executing on the main thread (being reused as a worker thread), and has started a logical operation, then one (or more than one) of the other parallelized delegates will "inherit" the main thread's LogicalOperationStack that now has one (or more) new logical operations in effect!

FWIW, I implemented (mainly for testing, I am not actually using it at the moment), the following "logical stack" to mimic the LogicalOperationStack, but do it in such a way that it will work with Parallel.* Feel free to try it out and/or use it. To test, replace the calls to

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation

in the sample code from my original question with calls to

LogicalOperation.OperationStack.Push()/Pop().


//OperationStack.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Runtime.Remoting.Messaging;

namespace LogicalOperation
{
  public static class OperationStack
  {
    private const string OperationStackSlot = "OperationStackSlot";

    public static IDisposable Push(string operation)
    {
      OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      OperationStackItem op = new OperationStackItem(parent, operation);
      CallContext.LogicalSetData(OperationStackSlot, op);
      return op;
    }

    public static object Pop()
    {
      OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;

      if (current != null)
      {
        CallContext.LogicalSetData(OperationStackSlot, current.Parent);
        return current.Operation;
      }
      else
      {
        CallContext.FreeNamedDataSlot(OperationStackSlot);
      }
      return null;
    }

    public static object Peek()
    {
      OperationStackItem top = Top();
      return top != null ? top.Operation : null;
    }

    internal static OperationStackItem Top()
    {
      OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      return top;
    }

    public static IEnumerable<object> Operations()
    {
      OperationStackItem current = Top();
      while (current != null)
      {
        yield return current.Operation;
        current = current.Parent;
      }
    }

    public static int Count
    {
      get
      {
        OperationStackItem top = Top();
        return top == null ? 0 : top.Depth;
      }
    }

    public static IEnumerable<string> OperationStrings()
    {
      foreach (object o in Operations())
      {
        yield return o.ToString();
      }
    }
  }
}


//OperationStackItem.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace LogicalOperation
{
  public class OperationStackItem : IDisposable
  {
    private OperationStackItem parent = null;
    private object operation;
    private int depth;
    private bool disposed = false;

    internal OperationStackItem(OperationStackItem parentOperation, object operation)
    {
      parent = parentOperation;
      this.operation = operation;
      depth = parent == null ? 1 : parent.Depth + 1;
    }

    internal object Operation { get { return operation; } }
    internal int Depth { get { return depth; } }

    internal OperationStackItem Parent { get { return parent; } }

    public override string ToString()
    {
      return operation != null ? operation.ToString() : "";
    }

    #region IDisposable Members

    public void Dispose()
    {
      if (disposed) return;

      OperationStack.Pop();

      disposed = true;
    }

    #endregion
  }
}

This was inspired by the scope objects described by Brent VanderMeide here: http://www.dnrtv.com/default.aspx?showNum=114

You could use this class like this:

public void MyFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFunc"))
  {
    MyOtherFunc();
  }
}

public void MyOtherFunc()
{
  using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
  {
    MyFinalFunc();
  }
}

public void MyFinalFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
  {
    Console.WriteLine("Hello");
  }
}
聆听风音 2024-10-19 09:03:39

我正在研究一种拥有逻辑堆栈的方法,该逻辑堆栈应该在大量使用 TPL 的应用程序中轻松工作。我决定使用 LogicalOperationStack,因为它可以完成我需要的所有操作,而无需更改现有代码。但后来我读到了 LogicalCallContext 中的一个错误:

https://connect.microsoft.com/VisualStudio/feedback/details/609929/ticalcallcontext-clone-bug-when-correlationmanager-slot-is-present

所以我试图找到解决方法对于这个错误,我想我已经让它在 TPL 上工作了(谢谢 ILSpy):

public static class FixLogicalOperationStackBug
{
    private static bool _fixed = false;

    public static void Fix()
    {
        if (!_fixed)
        {
            _fixed = true;

            Type taskType = typeof(Task);
            var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback");
            ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null);

            ContextCallback injectedCallback = new ContextCallback(obj =>
            {
                // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned
                CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack);
                s_ecCallback(obj);
            });

            s_ecCallbackField.SetValue(null, injectedCallback);
        }
    }
}

I was investigating a way to have a logical-stack that should work easily in an application that uses TPL heavily. I decided to use the LogicalOperationStack because it did all the stuff I needed without changing the existing code. But then I read about a bug in the LogicalCallContext:

https://connect.microsoft.com/VisualStudio/feedback/details/609929/logicalcallcontext-clone-bug-when-correlationmanager-slot-is-present

So I tried to find a workaround for this bug and I think I got it working for the TPL (Thank you ILSpy):

public static class FixLogicalOperationStackBug
{
    private static bool _fixed = false;

    public static void Fix()
    {
        if (!_fixed)
        {
            _fixed = true;

            Type taskType = typeof(Task);
            var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback");
            ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null);

            ContextCallback injectedCallback = new ContextCallback(obj =>
            {
                // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned
                CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack);
                s_ecCallback(obj);
            });

            s_ecCallbackField.SetValue(null, injectedCallback);
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文