Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to manage a mutex in an asynchronous method

I have ported my old HttpHandler (.ashx) TwitterFeed code to a WebAPI application. The core of the code uses the excellent Linq2Twitter package (https://linqtotwitter.codeplex.com/). Part of the port involved upgrading this component from version 2 to version 3, which now provides a number of asynchronous method calls - which are new to me. Here is the basic controller:

public async Task<IEnumerable<Status>> 
GetTweets(int count, bool includeRetweets, bool excludeReplies)
{
   var auth = new SingleUserAuthorizer
   {
      CredentialStore = new SingleUserInMemoryCredentialStore
      {
         ConsumerKey       = ConfigurationManager.AppSettings["twitterConsumerKey"],
         ConsumerSecret    = ConfigurationManager.AppSettings["twitterConsumerKeySecret"],
         AccessToken       = ConfigurationManager.AppSettings["twitterAccessToken"],
         AccessTokenSecret = ConfigurationManager.AppSettings["twitterAccessTokenSecret"]
      }
   };

   var ctx = new TwitterContext(auth);

   var tweets =
      await
      (from tweet in ctx.Status
         where (
            (tweet.Type == StatusType.Home)
            && (tweet.ExcludeReplies == excludeReplies)
            && (tweet.IncludeMyRetweet == includeRetweets)
            && (tweet.Count == count)
         )
      select tweet)
      .ToListAsync();

   return tweets;
}

This works fine, but previously, I had cached the results to avoid 'over calling' the Twitter API. It is here that I have run into a problem (more to do with my lack of understanding of the asynchronous protocol than anything else I suspect).

In overview, what I want to do is to first check the cache, if data doesn't exists, then rehydrate the cache and return the data to the caller (web page). Here is my attempt at the code

public class TwitterController : ApiController {

   private const string CacheKey = "TwitterFeed";

   public async Task<IEnumerable<Status>>
   GetTweets(int count, bool includeRetweets, bool excludeReplies)
   {
      var context = System.Web.HttpContext.Current;
      var tweets = await GetTweetData(context, count, includeRetweets, excludeReplies);
      return tweets;
   }

   private async Task<IEnumerable<Status>>
   GetTweetData(HttpContext context, int count, bool includeRetweets, bool excludeReplies)
   {
      var cache = context.Cache;
      Mutex mutex = null;
      bool iOwnMutex = false;
      IEnumerable<Status> data = (IEnumerable<Status>)cache[CacheKey];

      // Start check to see if available on cache
      if (data == null)
      {
         try
         {
            // Lock base on resource key
            mutex = new Mutex(true, CacheKey);

            // Wait until it is safe to enter (someone else might already be
            // doing this), but also add 30 seconds max.
            iOwnMutex = mutex.WaitOne(30000);

            // Now let's see if some one else has added it...
            data = (IEnumerable<Status>)cache[CacheKey];

            // They did, so send it...
            if (data != null)
            {
               return data;
            }

            if (iOwnMutex)
            {
               // Still not there, so now is the time to look for it!
               data = await CallTwitterApi(count, includeRetweets, excludeReplies);

               cache.Remove(CacheKey);
               cache.Add(CacheKey, data, null, GetTwitterExpiryDate(),
                  TimeSpan.Zero, CacheItemPriority.Normal, null);
            }
         }
         finally
         {
            // Release the Mutex.
            if ((mutex != null) && (iOwnMutex))
            {
               // The following line throws the error:
               // Object synchronization method was called from an
               // unsynchronized block of code.
               mutex.ReleaseMutex();
            }
         }
      }

      return data;
   }

   private DateTime GetTwitterExpiryDate()
   {
      string szExpiry = ConfigurationManager.AppSettings["twitterCacheExpiry"];
      int expiry = Int32.Parse(szExpiry);
      return DateTime.Now.AddMinutes(expiry);
   }

   private async Task<IEnumerable<Status>>
   CallTwitterApi(int count, bool includeRetweets, bool excludeReplies)
   {
      var auth = new SingleUserAuthorizer
      {
         CredentialStore = new SingleUserInMemoryCredentialStore
         {
            ConsumerKey = ConfigurationManager.AppSettings["twitterConsumerKey"],
            ConsumerSecret = ConfigurationManager.AppSettings["twitterConsumerKeySecret"],
            AccessToken = ConfigurationManager.AppSettings["twitterAccessToken"],
            AccessTokenSecret = ConfigurationManager.AppSettings["twitterAccessTokenSecret"]
         }
      };

      var ctx = new TwitterContext(auth);

      var tweets =
         await
         (from tweet in ctx.Status
          where (
             (tweet.Type == StatusType.Home)
             && (tweet.ExcludeReplies == excludeReplies)
             && (tweet.IncludeMyRetweet == includeRetweets)
             && (tweet.Count == count)
             && (tweet.RetweetCount < 1)
          )
          select tweet)
         .ToListAsync();

      return tweets;
   }

}

The problem occurs in the finally code block where the Mutex is released (though I have concerns about the overall pattern and approach of the GetTweetData() method):

if ((mutex != null) && (iOwnMutex))
{
    // The following line throws the error:
    // Object synchronization method was called from an
    // unsynchronized block of code.
    mutex.ReleaseMutex();
}

If I comment out the line, the code works correctly, but (I assume) I should release the Mutex having created it. From what I have found out, this problem is related to the thread changing between creating and releasing the mutex.

Because of my lack of general knowledge on asynchronous coding, I am not sure a) if the pattern I'm using is viable and b) if it is, how I address the problem.

Any advice would be much appreciated.

like image 743
Neilski Avatar asked Mar 19 '14 18:03

Neilski


People also ask

What is async mutex?

async-mutex is an npm package that implements the synchronizing primitives, allowing you to control asynchronous events deterministically. The synchronizing primitives are Mutex and Semaphore. Mutex (short for mutual exclusion) is a lock.

What is mutex in js?

What is a mutex? A mutex is an entity that allows to synchronize multiple concurrent processes by locking and waiting.

How does asynchronous method work in C#?

The async keyword turns a method into an async method, which allows you to use the await keyword in its body. When the await keyword is applied, it suspends the calling method and yields control back to its caller until the awaited task is complete. await can only be used inside an async method.

What is mutex in C# with example?

Mutex is a synchronization primitive that grants exclusive access to the shared resource to only one thread. If a thread acquires a mutex, the second thread that wants to acquire that mutex is suspended until the first thread releases the mutex.


1 Answers

Using a mutex like that isn't going to work. For one thing, a Mutex is thread-affine, so it can't be used with async code.

Other problems I noticed:

  • Cache is threadsafe, so it shouldn't need a mutex (or any other protection) anyway.
  • Asynchronous methods should follow the Task-based Asynchronous Pattern.

There is one major tip regarding caching: when you just have an in-memory cache, then cache the task rather than the resulting data. On a side note, I have to wonder whether HttpContext.Cache is the best cache to use, but I'll leave it as-is since your question is more about how asynchronous code changes caching patterns.

So, I'd recommend something like this:

private const string CacheKey = "TwitterFeed";

public Task<IEnumerable<Status>> GetTweetsAsync(int count, bool includeRetweets, bool excludeReplies)
{
  var context = System.Web.HttpContext.Current;
  return GetTweetDataAsync(context, count, includeRetweets, excludeReplies);
}

private Task<IEnumerable<Status>> GetTweetDataAsync(HttpContext context, int count, bool includeRetweets, bool excludeReplies)
{
  var cache = context.Cache;
  Task<IEnumerable<Status>> data = cache[CacheKey] as Task<IEnumerable<Status>>;
  if (data != null)
    return data;
  data = CallTwitterApiAsync(count, includeRetweets, excludeReplies);
  cache.Insert(CacheKey, data, null, GetTwitterExpiryDate(), TimeSpan.Zero);
  return data;
}

private async Task<IEnumerable<Status>> CallTwitterApiAsync(int count, bool includeRetweets, bool excludeReplies)
{
  ...
}

There's a small possibility that if two different requests (from two different sessions) request the same twitter feed at the same exact time, that the feed will be requested twice. But I wouldn't lose sleep over it.

like image 91
Stephen Cleary Avatar answered Oct 01 '22 09:10

Stephen Cleary