Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Strange execution jump when using async/await and System.Threading.Tasks.Parallel

I have the following method:

public async Task ExecuteAsync()
{
     Task<IEnumerable<Comment>> gettingComments = RetrieveComments();

     Dictionary<string, ReviewManager> reviewers = ConfigurationFacade.Repositories.ToDictionary(name => name, name => new ReviewManager(name));

     IEnumerable<Comment> comments = await gettingComments;

     Parallel.ForEach(reviewers, (reviewer) => {
          Dictionary<Comment, RevisionResult> reviews = reviewer.Value.Review(comments);

          int amountModerated = ModerateComments(reviews.Where(r => r.Value.IsInsult), "hide");
     });
}

My ModerateComments method looks like the following:

private Task<int> ModerateComments(IEnumerable<Comment> comments, string operation)
{
      return Task.Factory.StartNew(() =>
      {
          int moderationCount = 0;
          Parallel.ForEach(comments, async (comment) => 
          {
               bool moderated = await ModerateComment(comment, operation); //Problem here
               if(moderated)
                   moderationCount++;
          }
          return moderationCount;
      };
}

And finally:

private async Task<bool> ModerateComment(Comment comment, string operation, string authenticationToken = null)
{
      if(comment == null) return false;

      if(String.IsNullOrWhiteSpace(authenticationToken))
             authenticationToken = CreateUserToken(TimeSpan.FromMinutes(1));

      string moderationEndpoint = ConfigurationFacade.ModerationEndpoint;

      using(HttpRequestMessage request = new HttpRequestMessage())
      {
          request.Method = HttpMethod.Post;
          request.RequestUri = new Uri(moderationEndpoint);
          using(HttpResponseMessage response = await _httpClient.SendAsync(request)) //Problem here
          {
               if(!response.IsSuccessStatusCode)
               {
                    if(response.StatusCode == HttpStatusCode.Unauthorized)
                        return await ModerateComment(comment, operation, null); //Retry operation with a new access token
                    else if(response.StatusCode == HttpStatusCode.GatewayTimeout)
                        return await ModerateComment(comment, operation, authenticationToken); //Retry operation

                    return false;
               } 
          }
      }

      return true;
}

I'm having a strange problem at runtime. All the above code is working fine except when it reaches the line:

using(HttpResponseMessage response = await _httpClient.SendAsync(request)) {
      //...
}

When I debug my application, this instruction is executed but just after that, it does not throw any exception, nor return anything, it just finish executing and I am derived to the next statement on the Parallel.ForEach loop.

It is really hard to explain so I'll post some images:

All good so far, I reach the following line of code: Image01

The execution keeps going well and I reach the call to the Moderation API Image02

Even if I press F10 (Next statement) in the debugger, the execution flow jumps to the next loop in the Parallel.ForEach loop.

Image03

As you can see I have breakpoints in the try-catch just i ncase any exception is thrown, but the breakpoint is never activated, neither is activated the breakpoint in if(moderacion) commentCount++.

So what happens here? Where did my execution flow went? It just dissapears after sending the POST request to the API.

After continuing the execution, all the elements in the enumerable do the same jump, and therefore, my commentCount variable ends up being equal to 0 Image04

like image 695
Matias Cicero Avatar asked Mar 15 '23 14:03

Matias Cicero


2 Answers

You don't need Parallel.ForEach or Task.Factory.StartNew to do IO bound work:

private async Task<int> ModerateCommentsAsync(IEnumerable<Comment> comments, string operation)
{
      var commentTasks = comments.Select(comment => ModerateCommentAsync(comment, operation));

      await Task.WhenAll(commentTasks);
      return commentTasks.Count(x => x.Result);
}

Common practice is to add the Async postfix to an async method.

like image 200
Yuval Itzchakov Avatar answered Mar 18 '23 07:03

Yuval Itzchakov


Excellent description for a common problem. Parallel.ForEach does not support async lambdas. async methods return once they hit the first await that would need to block. This happens when you issue the HTTP request.

Use one of the common patterns for a parallel async foreach loop.

like image 28
usr Avatar answered Mar 18 '23 09:03

usr