Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is CorrelationManager.LogicalOperationStack compatible with Parallel.For, Tasks, Threads, etc

Please see this question for background information:

How do Tasks in the Task Parallel Library affect ActivityID?

That question asks how Tasks affect Trace.CorrelationManager.ActivityId. @Greg Samson answered his own question with a test program showing that ActivityId is reliable in the context of Tasks. The test program sets an ActivityId at the beginning of the Task delegate, sleeps to simulate work, then checks the ActivityId at the end to make sure that it is the same value (i.e. that it has not been modified by another thread). The program runs successfully.

While researching other "context" options for threading, Tasks, and Parallel operations (ultimately to provide better context for logging), I ran into a strange issue with Trace.CorrelationManager.LogicalOperationStack (it was strange to me anyway). I have copied my "answer" to his question below.

I think that it adequately describes the issue that I ran into (Trace.CorrelationManager.LogicalOperationStack apparently getting corrupted - or something - when used in the context of Parallel.For, but only if the Parallel.For itself is enclosed in a logical operation).

Here are my questions:

  1. Should Trace.CorrelationManager.LogicalOperationStack be usable with Parallel.For? If so, should it make a difference if a logical operation is already in effect with the Parallel.For is started?

  2. Is there a "correct" way to use LogicalOperationStack with Parallel.For? Could I code this sample program differntly so that it "works"? By "works", I mean that the LogicalOperationStack always has the expected number of entries and the entries themselves are the expected entries.

I have done some additional testing using Threads and ThreadPool threads, but I would have to go back and retry those tests to see if I ran into similar problems.

I will say that it does appear that Task/Parallel threads and ThreadPool threads DO "inherit" the Trace.CorrelationManager.ActivityId and Trace.CorrelationManager.LogicalOperationStack values from the parent thread. This is expected as these values are stored by the CorrelationManager using CallContext's LogicalSetData method (as opposed to SetData).

Again, please refer back to this question to get the original context for the "answer" that I posted below:

How do Tasks in the Task Parallel Library affect ActivityID?

See also this similar question (which so far has not been answered) on Microsoft's Parallel Extensions forum:

http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[BEGIN PASTE]

Please forgive my posting this as an answer as it is not really answer to your question, however, it is related to your question since it deals with CorrelationManager behavior and threads/tasks/etc. I have been looking at using the CorrelationManager's LogicalOperationStack (and StartLogicalOperation/StopLogicalOperation methods) to provide additional context in multithreading scenarios.

I took your example and modified it slightly to add the ability to perform work in parallel using Parallel.For. Also, I use StartLogicalOperation/StopLogicalOperation to bracket (internally) DoLongRunningWork. Conceptually, DoLongRunningWork does something like this each time it is executed:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

I have found that if I add these logical operations to your code (more or less as is), all of the logical operatins remain in sync (always the expected number of operations on stack and the values of the operations on the stack are always as expected).

In some of my own testing I found that this was not always the case. The logical operation stack was getting "corrupted". The best explanation I could come up with is that the "merging" back of the CallContext information into the "parent" thread context when the "child" thread exits was causing the "old" child thread context information (logical operation) to be "inherited" by another sibling child thread.

The problem might also be related to the fact that Parallel.For apparently uses the main thread (at least in the example code, as written) as one of the "worker threads" (or whatever they should be called in the parallel domain). Whenever DoLongRunningWork is executed, a new logical operation is started (at the beginning) and stopped (at the end) (that is, pushed onto the LogicalOperationStack and popped back off of it). If the main thread already has a logical operation in effect and if DoLongRunningWork executes ON THE MAIN THREAD, then a new logical operation is started so the main thread's LogicalOperationStack now has TWO operations. Any subsequent executions of DoLongRunningWork (as long as this "iteration" of DoLongRunningWork is executing on the main thread) will (apparently) inherit the main thread's LogicalOperationStack (which now has two operations on it, rather than just the one expected operation).

It took me a long time to figure out why the behavior of the LogicalOperationStack was different in my example than in my modified version of your example. Finally I saw that in my code I had bracketed the entire program in a logical operation, whereas in my modified version of your test program I did not. The implication is that in my test program, each time my "work" was performed (analogous to DoLongRunningWork), there was already a logical operation in effect. In my modified version of your test program, I had not bracketed the entire program in a logical operation.

So, when I modified your test program to bracket the entire program in a logical operation AND if I am using Parallel.For, I ran into exactly the same problem.

Using the conceptual model above, this will run successfully:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

While this will eventually assert due to an apparently out of sync LogicalOperationStack:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

Here is my sample program. It is similar to yours in that it has a DoLongRunningWork method that manipulates the ActivityId as well as the LogicalOperationStack. I also have two flavors of kicking of DoLongRunningWork. One flavor uses Tasks one uses Parallel.For. Each flavor can also be executed such that the whole parallelized operation is enclosed in a logical operation or not. So, there are a total of 4 ways to execute the parallel operation. To try each one, simply uncomment the desired "Use..." method, recompile, and run. UseTasks, UseTasks(true), and UseParallelFor should all run to completion. UseParallelFor(true) will assert at some point because the LogicalOperationStack does not have the expected number of entries.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

This whole issue of if LogicalOperationStack can be used with Parallel.For (and/or other threading/Task constructs) or how it can be used probably merits its own question. Maybe I will post a question. In the meantime, I wonder if you have any thoughts on this (or, I wonder if you had considered using LogicalOperationStack since ActivityId appears to be safe).

[END PASTE]

Does anyone have any thoughts on this issue?

like image 735
wageoghe Avatar asked Jan 18 '11 21:01

wageoghe


2 Answers

[Begin Update]

I also asked this question on Microsoft's Parallel Extensions for .Net support forum and eventually received an answer from Stephen Toub. It turns out there there is a bug in the LogicalCallContext that is causing the LogicalOperationStack to be corrupted. There is also a nice description (in a followup by Stephen to a reply that I made to his answer) that gives a brief overiew of how Parallel.For works regarding doling out Tasks and why that makes Parallel.For susceptible to the bug.

In my answer below I speculate that LogicalOperationStack is not compatible with Parallel.For because Parallel.For uses the main thread as one of the "worker" threads. Based on Stephen's explanation, my speculation was incorrect. Parallel.For does use the main thread as one of the "worker" threads, but it is not simply used "as is". The first Task is run on the main thread, but is run in such a way that it is as if it is run on a new thread. Read Stephen's description for more info.

[End Update]

From what I can tell, the answer is as follows:

Both ActivityId and LogicalOperationStack are stored via CallContext.LogicalSetData. That means that these values will be "flowed" to any "child" threads. That is pretty cool as you could, for example, set ActivityId at the entry point into a multithreaded server (say a service call) and all threads that are ultimately started from that entry point can be part of the same "activity". Similarly, logical operations (via the LogicalOperationStack) also flow to the child threads.

With regards to Trace.CorrelationManager.ActivityId:

ActivityId seems to be compatible with all threading models that I have tested it with: Using threads directly, using ThreadPool, using Tasks, using Parallel.*. In all cases, ActivityId has the expected value.

With regards to Trace.CorrelationManager.LogicalOperationStack:

LogicalOperationStack seems to be compatible with most threading models, but NOT with Parallel.*. Using threads directly, ThreadPool, and Tasks, the LogicalOperationStack (as manipulated in the sample code provided in my question) maintains its integrity. At all times the contents of the LogicalOperationStack is as expected.

LogicalOperationStack is NOT compatible with Parallel.For. If a logical operation is "in effect", that is if you have called CorrelationManager.StartLogicalOperation, prior to starting the Parallel.* operation and then you start a new logical operation in the context of the Paralle.* (i.e. in the delegate), then the LogicalOperationStack WILL be corrupted. (I should say that it will PROBABLY be corrupted. Parallel.* might not create any additional threads, which means that the LogicalOperationStack would be safe).

The problem stems from the fact that Parallel.* uses the main thread (or, probably more correctly, the thread that starts the parallel operation) as one of its "worker" threads. That means that as "logical operations" are started and stopped in the "worker" thread that is the same as the "main" thread, the "main" thread's LogicalOperationStack is being modified. Even if the calling code (i.e. the delegate) maintains the stack correctly (ensuring that each StartLogicalOperation is "stopped" with a corresponding StopLogicalOperation), the "main" threads stack is modified. Ultimately it seems (to me, anyway), that the LogicalOperationStack of the "main" thread is essentially being modified by two different "logical" threads: the "main" thread and a "worker" thread, which both happen to be the SAME thread.

I don't know the deep down specifics of exactly why this is not working (at least as I would expect it work). My best guess is that each time the delegate is executed on a thread (that is not the same as the main thread), the thread "inherits" the current state of the main thread's LogicalOperationStack. If the delegate is currently executing on the main thread (being reused as a worker thread), and has started a logical operation, then one (or more than one) of the other parallelized delegates will "inherit" the main thread's LogicalOperationStack that now has one (or more) new logical operations in effect!

FWIW, I implemented (mainly for testing, I am not actually using it at the moment), the following "logical stack" to mimic the LogicalOperationStack, but do it in such a way that it will work with Parallel.* Feel free to try it out and/or use it. To test, replace the calls to

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation

in the sample code from my original question with calls to

LogicalOperation.OperationStack.Push()/Pop().


//OperationStack.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Runtime.Remoting.Messaging;

namespace LogicalOperation
{
  public static class OperationStack
  {
    private const string OperationStackSlot = "OperationStackSlot";

    public static IDisposable Push(string operation)
    {
      OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      OperationStackItem op = new OperationStackItem(parent, operation);
      CallContext.LogicalSetData(OperationStackSlot, op);
      return op;
    }

    public static object Pop()
    {
      OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;

      if (current != null)
      {
        CallContext.LogicalSetData(OperationStackSlot, current.Parent);
        return current.Operation;
      }
      else
      {
        CallContext.FreeNamedDataSlot(OperationStackSlot);
      }
      return null;
    }

    public static object Peek()
    {
      OperationStackItem top = Top();
      return top != null ? top.Operation : null;
    }

    internal static OperationStackItem Top()
    {
      OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      return top;
    }

    public static IEnumerable<object> Operations()
    {
      OperationStackItem current = Top();
      while (current != null)
      {
        yield return current.Operation;
        current = current.Parent;
      }
    }

    public static int Count
    {
      get
      {
        OperationStackItem top = Top();
        return top == null ? 0 : top.Depth;
      }
    }

    public static IEnumerable<string> OperationStrings()
    {
      foreach (object o in Operations())
      {
        yield return o.ToString();
      }
    }
  }
}


//OperationStackItem.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace LogicalOperation
{
  public class OperationStackItem : IDisposable
  {
    private OperationStackItem parent = null;
    private object operation;
    private int depth;
    private bool disposed = false;

    internal OperationStackItem(OperationStackItem parentOperation, object operation)
    {
      parent = parentOperation;
      this.operation = operation;
      depth = parent == null ? 1 : parent.Depth + 1;
    }

    internal object Operation { get { return operation; } }
    internal int Depth { get { return depth; } }

    internal OperationStackItem Parent { get { return parent; } }

    public override string ToString()
    {
      return operation != null ? operation.ToString() : "";
    }

    #region IDisposable Members

    public void Dispose()
    {
      if (disposed) return;

      OperationStack.Pop();

      disposed = true;
    }

    #endregion
  }
}

This was inspired by the scope objects described by Brent VanderMeide here: http://www.dnrtv.com/default.aspx?showNum=114

You could use this class like this:

public void MyFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFunc"))
  {
    MyOtherFunc();
  }
}

public void MyOtherFunc()
{
  using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
  {
    MyFinalFunc();
  }
}

public void MyFinalFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
  {
    Console.WriteLine("Hello");
  }
}
like image 132
wageoghe Avatar answered Nov 25 '22 14:11

wageoghe


I was investigating a way to have a logical-stack that should work easily in an application that uses TPL heavily. I decided to use the LogicalOperationStack because it did all the stuff I needed without changing the existing code. But then I read about a bug in the LogicalCallContext:

https://connect.microsoft.com/VisualStudio/feedback/details/609929/logicalcallcontext-clone-bug-when-correlationmanager-slot-is-present

So I tried to find a workaround for this bug and I think I got it working for the TPL (Thank you ILSpy):

public static class FixLogicalOperationStackBug
{
    private static bool _fixed = false;

    public static void Fix()
    {
        if (!_fixed)
        {
            _fixed = true;

            Type taskType = typeof(Task);
            var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback");
            ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null);

            ContextCallback injectedCallback = new ContextCallback(obj =>
            {
                // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned
                CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack);
                s_ecCallback(obj);
            });

            s_ecCallbackField.SetValue(null, injectedCallback);
        }
    }
}
like image 34
BasvdL Avatar answered Nov 25 '22 14:11

BasvdL