Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Azure TableQuery thread safety with Parallel.ForEach

I have some basic Azure tables that I've been querying serially:

var query = new TableQuery<DynamicTableEntity>()
  .Where(TableQuery.GenerateFilterCondition("PartitionKey",
    QueryComparisons.Equal, myPartitionKey));

foreach (DynamicTableEntity entity in myTable.ExecuteQuery(query)) {
  // Process entity here.
}

To speed this up, I parallelized this like so:

Parallel.ForEach(myTable.ExecuteQuery(query), (entity, loopState) => {
  // Process entity here in a thread-safe manner.

  // Edited to add: Details of the loop body below:

  // This is the essence of the fixed loop body:
  lock (myLock) {
    DataRow myRow = myDataTable.NewRow();
    // [Add entity data to myRow.]
    myDataTable.Rows.Add(myRow);
  }

  // Old code (apparently not thread-safe, though NewRow() is supposed to create
  // a DataRow based on the table's schema without changing the table state):
  /*
    DataRow myRow = myDataTable.NewRow();
    lock (myLock) {
      // [Add entity data to myRow.]
      myDataTable.Rows.Add(myRow);
    }
  */
});

This produces significant speedup, but the results tend to be slightly different between runs (i.e., some of the entities differ occasionally, though the number of entities returned is exactly the same).

From this and some web searching, I conclude that the enumerator above is not always thread-safe. The documentation appears to suggest that thread safety is guaranteed only if the table objects are public static, but that hasn't made a difference for me.

Could someone suggest how to resolve this? Is there a standard pattern for parallelizing Azure table queries?

like image 926
Paul Lambert Avatar asked Oct 05 '14 06:10

Paul Lambert


1 Answers

Your comment is correct: DataTable is not suitable for concurrent operations involving mutation and is the source of the duplicate entries. Locking the DataTable object for row modification operations will resolve the issue:

 lock (myTable)
 {
    DataRow myRow = myTable.NewRow();
    myRow.SetField<int>("c1", (int)value);
    myTable.Rows.Add(myRow);
 }

Putting NewRow() outside the lock will intermittently result in duplicate row entries in the table or "An unhandled exception of type 'System.ArgumentException' occurred in System.Data.dll" exceptions on the NewRow() line. For additional details and alternatives for concurrent DataTable usage see Thread safety for DataTable

To reproduce the error condition, use this code. Some runs will be clean, some will contain duplicate entries, and some will encounter exceptions.

   class Program
   {
      static DataTable myTable = GetTable();
      static ManualResetEvent waitHandle = new ManualResetEvent(false);

      static void Main(string[] args)
      {
         const int threadCount = 10;
         List<Thread> threads = new List<System.Threading.Thread>();
         for (int i = 0; i < threadCount; ++i) 
         {
            threads.Add(new Thread(new ParameterizedThreadStart(AddRowThread)));
            threads[i].Start(i);
         }
         waitHandle.Set(); // Release all the threads at once
         for (int i = 0; i < threadCount; ++i) 
         {
            threads[i].Join();
         }

         // Print results once threads return
         for (int i = 0; i < myTable.Rows.Count; ++i)
         {
            Console.WriteLine(myTable.Rows[i].Field<int>(0));
         }
         Console.WriteLine("---Processing Complete---");
         Console.ReadKey();
      }

      static void AddRowThread(object value)
      {
         waitHandle.WaitOne();
         DataRow myRow = myTable.NewRow(); // THIS RESULTS IN INTERMITTENT ERRORS
         lock (myTable)
         {
            //DataRow myRow = myTable.NewRow(); // MOVE NewRow() CALL HERE TO RESOLVE ISSUE
            myRow.SetField<int>("c1", (int)value);
            myTable.Rows.Add(myRow);
         }
      }

      static DataTable GetTable()
      {
         // Here we create a DataTable with four columns.
         DataTable table = new DataTable();
         table.Columns.Add("c1", typeof(int));       
         return table;
      }
   }
like image 116
Michael Myrah - MSFT Avatar answered Sep 29 '22 09:09

Michael Myrah - MSFT