Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How should I propagate task exception from continuation task in .NET 4.0?

I have the following code:

innerExceptions = dbconnByServer
    .AsParallel()
    .WithDegreeOfParallelism(dbconnByServer.Count)
    // A stream of groups of server connections proceeding in parallel per server
    .Select(dbconns => dbconns.Select(dbconn => m_sqlUtilProvider.Get(dbconn)))
    // A stream of groups of SqlUtil objects proceeding in parallel per server
    .Select(sqlUtils => GetTaskException(sqlUtils
        // Aggregate SqlUtil objects to form a single Task which runs the SQL asynchronously for the first SqlUtil, then upon completion
        // for the next SqlUtil and so long until all the SqlUtil objects are processed asynchronously one after another.
        .Aggregate<ISqlUtil, Task>(null, (res, sqlUtil) =>
        {
            if (res == null)
            {
                return sqlUtil.ExecuteSqlAsync(SQL, parameters);
            }
            return res.ContinueWith(_ => sqlUtil.ExecuteSqlAsync(SQL, parameters)).Unwrap();
        })))
    .Where(e => e != null)
    .ToList();

Where:

private static Exception GetTaskException(Task t)
{
    try
    {
        t.Wait();
        return null;
    }
    catch (Exception exc)
    {
        return exc;
    }
}

What this code does is execute certain SQL statement across a multitude of db connections, where some connections may belong to one DB server, while others - to another and so on.

The code makes sure that two conditions hold:

  1. The SQL statements are run in parallel across the available DB servers.
  2. Within the same DB server, the SQL statements are run asynchronously, but sequentially.

Given N db connections per some DB server there will be a single Task in the end of the aggregation, executing which has the following effect:

  • Execute SQL for db conn 1
    • Upon completion of previous, execute SQL for db conn 2
      • Upon completion of previous, execute SQL for db conn 3
      • ...
        • Upon completion of previous, execute SQL for db conn N

My problem is that right now exceptions are lost except of the very first db connection. I know I should examine the _ argument and somehow process the _.Exception property inside the continuation function. I wonder if there is an elegant way to do it.

Any ideas?

like image 783
mark Avatar asked Nov 13 '22 03:11

mark


1 Answers

  1. If you're going to use asynchronous methods, then the whole method should be asynchronous and shouldn't block any threads.
  2. If you're not going to block, there is not much reason to use PLINQ, setting up the continuations from a single thread should be fast enough.
  3. If you want to continue when an exception happens, you will have to store the exceptions somewhere by yourself.
  4. I feel kind of uneasy about working with a collection of exceptions without throwing them, but I guess that's okay for an operation, that can partially fail and partially succeed.

With that, my code for doing this would look like this:

public Task<IEnumerable<Exception>> ExecuteOnServersAsync(
    IList<IEnumerable<Connection>> dbConnByServer,
    string sql, object parameters)
{
    var tasks = new List<Task>();
    var exceptions = new ConcurrentQueue<Exception>();

    Action<Task> handleException = t =>
    {
        if (t.IsFaulted)
            exceptions.Enqueue(t.Exception);
    };

    foreach (var dbConns in dbConnByServer)
    {
        Task task = null;

        foreach (var dbConn in dbConns)
        {
            var sqlUtil = m_sqlUtilProvider.Get(dbConn);

            if (task == null)
            {
                task = sqlUtil.ExecuteSqlAsync(sql, parameters);
            }
            else
            {
                task = task.ContinueWith(
                    t =>
                    {
                        handleException(t);
                        return sqlUtil.ExecuteSqlAsync(sql, parameters);
                    }).Unwrap();
            }
        }

        if (task != null)
        {
            task = task.ContinueWith(handleException);
            tasks.Add(task);
        }
    }

    return Task.Factory.ContinueWhenAll(
        tasks.ToArray(), _ => exceptions.AsEnumerable());
}
like image 136
svick Avatar answered Nov 14 '22 21:11

svick