Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make Sequential Processing as simple as Parallel Processing

I've got two .net Task objects that I may wish to run in parellel or in sequence. In either case, I don't want to block a thread to wait for them. As it turns out, Reactive Extensions make the parallel story simply beautiful. But when I try to arrange the tasks in sequence, the code works but just feels awkward.

I'd like to know if anyone can show how to make the sequential version more concise or be coded as effortlessly as the parallel version. It is not necessary to use reactive extensions for the answer to this question.

For reference, here are my two solutions for both parallel and sequential processing.

Parallel Processing Version

This is pure joy:

    public Task<string> DoWorkInParallel()
    {
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
        Task<bool> BravoTask = Task.Factory.StartNew(() => true);

        //Prepare for Rx, and set filters to allow 'Zip' to terminate early
        //in some cases.
        IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
        IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);

        Observable
            .Zip(
                AsyncAlpha,
                AsyncBravo,
                (x, y) => y.ToString() + x.ToString())
            .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
                (x) => { result.TrySetResult(x); },
                (x) => { result.TrySetException(x.GetBaseException()); },
                () => { result.TrySetResult("Nothing"); });

        return result.Task;
    }

Sequential/Pipeline Processing Version

This works but is just clumsy:

    public Task<string> DoWorkInSequence()
    {
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);

        AlphaTask.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                result.TrySetException(x.Exception.GetBaseException());
            }
            else
            {
                if (x.Result != 5)
                {
                    Task<bool> BravoTask = Task.Factory.StartNew(() => true);
                    BravoTask.ContinueWith(y =>
                    {
                        if (y.IsFaulted)
                        {
                            result.TrySetException(y.Exception.GetBaseException());
                        }
                        else
                        {
                            if (y.Result)
                            {
                                result.TrySetResult(x.Result.ToString() + y.Result.ToString());
                            }
                            else
                            {
                                result.TrySetResult("Nothing");
                            }
                        }
                    });
                }
                else
                {
                    result.TrySetResult("Nothing");
                }
            }
        }
        );

        return result.Task;
    }

In the above sequential code, it has become a mess and I havn't even added the timeout capability to match the parallel version!

Requirements (UPDATED on 8/6)

For those answering, please be mindful that:

  1. The sequential scenario should permit the arrangement where the output of the first task feeds the input of the second. My sample "awkward" code above could easily have been arranged to achieve that.

  2. I'm interested in a .net 4.5 answer - but a .net 4.0 answer is equally or more important for me.

  3. Tasks 'Alpha' and 'Bravo' have a combined time-limit of 200ms in which to complete; they do not have 200ms each. This is true in the sequential case as well.

  4. The SourceCompletionTask must complete early, before both tasks complete, if either task returns an invalid result. An invalid result is either [AlphaTask:5] or [BravoTask:false] as indicated by the explicit tests in the sample code.
    Update 8/8: Clarification - In the sequential case, the BravoTask should not execute at all if AlphaTask is not successful or if the timeout has already occurred.

  5. Assume both AlphaTask and BravoTask cannot block. Not that it matters, but in my real-world scenario they are actually async WCF service calls.

Maybe there is an aspect of Rx I could have exploited to clean up the sequential version. But even just Task programming by itself should have a better story I'd imagine. We'll see.

ERRATA In both code samples I changed the return type to Task, as poster answers were quite correct that I should not have been returning a TaskCompletionSource.

like image 662
Brent Arias Avatar asked Aug 05 '13 14:08

Brent Arias


2 Answers

If you can use async/await, Brandon has a nice answer. If you are still on VS2010, the first thing I would do to clean up the sequential version is to get an extension method like the Then method Stephen Toub described in a blog post. I would also implement a Task.FromResult method if you are not using .NET 4.5. With those, you could get:

public Task<string> DoWorkInSequence()
{
    return Task.FromResult(4)
           .Then(x => 
                 { if (x != 5)
                   {
                       return Task.FromResult(true)
                              .Then(y => 
                                    { if (y)
                                      {
                                          return Task.FromResult(x.ToString() + y.ToString());
                                      }
                                      else
                                      {
                                          return Task.FromResult("Nothing");
                                      }
                                    });
                    }
                    else
                    {
                        return Task.FromResult("Nothing");
                    }
                 });
}

Also, you should generally return a Task instead of a TaskCompletionSource (which you can get by calling .Task on the TaskCompletionSource), since you don't want the caller to set a result on the task you are returning to them.

Brandon's answer also gives a good way to implement the timeout functionality (adjusting for the lack of async/await keywords).

EDIT To reduce arrow code, we can implement more of the LINQ methods. A SelectMany implementation is provided in the previously linked blog post. The other methods we will need for LINQ are Select and Where. These should be fairly straightforward once you've done Then and SelectMany, but here they are:

public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate)
{
    if (task == null) throw new ArgumentNullException("task");
    if (predicate == null) throw new ArgumentNullException("predicate");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
        {
            if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
            else if (completed.IsCanceled) tcs.TrySetCanceled();
            else
            {
                try
                {
                    if (predicate(completed.Result))
                        tcs.TrySetResult(completed.Result);
                    else
                        tcs.TrySetCanceled();
                }
                catch (Exception ex)
                {
                    tcs.TrySetException(ex);
                }
            }
        });
    return tcs.Task;
}

public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector)
{
    if (task == null) throw new ArgumentNullException("task");
    if (selector == null) throw new ArgumentNullException("selector");

    var tcs = new TaskCompletionSource<TResult>();
    task.ContinueWith((completed) =>
    {
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetCanceled();
        else
        {
            try
            {
                tcs.TrySetResult(selector(completed.Result));
            }
            catch (Exception ex)
            {
                tcs.TrySetException(ex);
            }
        }
    });
    return tcs.Task;
}

After that, one final non-LINQ extension method allows use to return a default value when cancelled:

public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue)
{
    if (task == null) throw new ArgumentNullException("task");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
    {
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetResult(defaultValue);
        else tcs.TrySetResult(completed.Result);
    });
    return tcs.Task;
}

And the new and improved DoWork (sans timeout):

public static Task<string> DoWorkInSequence()
{
    return (from x in Task_FromResult(5)
            where x != 5
            from y in Task_FromResult(true)
            where y
            select x.ToString() + y.ToString()
           ).IfCanceled("Nothing");
}

The Timeout method from Brandon's answer (once rewritten, if needed without async/await) can be stuck on the end of the chain for an overall timeout and/or after each step in the chain if you want to keep further steps from running once the overall timeout is reached. Another possibility for the chain interruption would be to make all the individual steps take a cancellation token and modify the Timeout method to take the CancellationTokenSource and cancel it if a timeout occurs, as well as throwing the timeout exception.

EDIT (Brent Arias)

Taking fantastic ideas from what you have presented, I've devised what I think is the final answer from my POV. It is based on the .net 4.0 extension methods found in the nuget package of ParallelExtensionsExtras. The sample below adds a third task, to help illustrate the "feel" of programming for sequential tasks, given my stated requirements:

public Task<string> DoWorkInSequence()
{
    var cts = new CancellationTokenSource();

    Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); });

    Task<int> AlphaTask = Task.Factory
        .StartNew(() => 4 )
        .Where(x => x != 5 && !cts.IsCancellationRequested);

    Task<bool> BravoTask = AlphaTask
        .Then(x => true)
        .Where(x => x && !cts.IsCancellationRequested);

    Task<int> DeltaTask = BravoTask
        .Then(x => 7)
        .Where(x => x != 8);

    Task<string> final = Task.Factory
        .WhenAny(DeltaTask, timer)
        .ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion
            ? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing");

    //This is here just for experimentation.  Placing it at different points
    //above will have varying effects on what tasks were cancelled at a given point in time.
    cts.Cancel();

    return final;
}

There are a few key observations I've made in this discussion and joint effort:

  • Using the "Then" extension is nice in trivial cases, but has noteworthy limited applicability. For more complex cases it would be necessary to replace it with, for example, .ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default). When replacing "Then" with "ContinueWith" for my stated scenario, it is critical to add the OnlyOnRanToCompletion option.
  • Using a Timeout extension ultimately just doesn't work in my scenario. This is because it will only cause a cancellation of the Task it is immediately attached to, instead of cancelling all the antecedant Task instances in the sequence. This is why I switched to the StartNewDelayed(...) tactic and added an express cancellation check in each Where clause.
  • Although the ParallelExtensionsExtras library has the LINQ to Tasks defined that you used, I concluded it is best to stay away from LINQ-ish appearances with Tasks. This is because Tasks with LINQ are highly esoteric; it would likely confuse the hell out of the average developer. It is hard enough to get them to understand asynchronous coding. Even the author of LINQ to Tasks said "How useful this LINQ implementation is in practice is arguable, but at the very least it provides for an interesting thought exercise." Yes agreed, an interesting thought exercise. Of course I must admit at least the "Where" LINQ to Tasks method, as it played a key role in the solution I listed above.
like image 63
Gideon Engelberth Avatar answered Sep 22 '22 02:09

Gideon Engelberth


public Task<string> DoWorkInSequence()
{
    Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
    Func<int> BravoFunc = x => 2 * x;

    //Prepare for Rx, and set filters to allow 'Zip' to terminate early
    //in some cases.
    IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);

    return AsyncAlpha
        .Do(x => Console.WriteLine(x))  //This is how you "Do WORK in sequence"
        .Select(BravoFunc)              //This is how you map results from Alpha
                                        //via a second method.
        .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
            (x) => { result.TrySetResult(x); },
            (x) => { result.TrySetException(x.GetBaseException()); },
            () => { result.TrySetResult("Nothing"); }).ToTask();
}

Ultimately however I would actually just do it all in TPL if you want Tasks, or use Observable.ToTask(this IObservable<T> observable) as opposed to using TaskCompletionSource

like image 41
Aron Avatar answered Sep 18 '22 02:09

Aron