I've got two .net Task objects that I may wish to run in parellel or in sequence. In either case, I don't want to block a thread to wait for them. As it turns out, Reactive Extensions make the parallel story simply beautiful. But when I try to arrange the tasks in sequence, the code works but just feels awkward.
I'd like to know if anyone can show how to make the sequential version more concise or be coded as effortlessly as the parallel version. It is not necessary to use reactive extensions for the answer to this question.
For reference, here are my two solutions for both parallel and sequential processing.
This is pure joy:
public Task<string> DoWorkInParallel()
{
var result = new TaskCompletionSource<string>();
Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
Task<bool> BravoTask = Task.Factory.StartNew(() => true);
//Prepare for Rx, and set filters to allow 'Zip' to terminate early
//in some cases.
IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);
Observable
.Zip(
AsyncAlpha,
AsyncBravo,
(x, y) => y.ToString() + x.ToString())
.Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
(x) => { result.TrySetResult(x); },
(x) => { result.TrySetException(x.GetBaseException()); },
() => { result.TrySetResult("Nothing"); });
return result.Task;
}
This works but is just clumsy:
public Task<string> DoWorkInSequence()
{
var result = new TaskCompletionSource<string>();
Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
AlphaTask.ContinueWith(x =>
{
if (x.IsFaulted)
{
result.TrySetException(x.Exception.GetBaseException());
}
else
{
if (x.Result != 5)
{
Task<bool> BravoTask = Task.Factory.StartNew(() => true);
BravoTask.ContinueWith(y =>
{
if (y.IsFaulted)
{
result.TrySetException(y.Exception.GetBaseException());
}
else
{
if (y.Result)
{
result.TrySetResult(x.Result.ToString() + y.Result.ToString());
}
else
{
result.TrySetResult("Nothing");
}
}
});
}
else
{
result.TrySetResult("Nothing");
}
}
}
);
return result.Task;
}
In the above sequential code, it has become a mess and I havn't even added the timeout capability to match the parallel version!
For those answering, please be mindful that:
The sequential scenario should permit the arrangement where the output of the first task feeds the input of the second. My sample "awkward" code above could easily have been arranged to achieve that.
I'm interested in a .net 4.5 answer - but a .net 4.0 answer is equally or more important for me.
Tasks 'Alpha' and 'Bravo' have a combined time-limit of 200ms in which to complete; they do not have 200ms each. This is true in the sequential case as well.
The SourceCompletionTask must complete early, before both tasks complete, if either task returns an invalid result. An invalid result is either [AlphaTask:5] or [BravoTask:false] as indicated by the explicit tests in the sample code.
Update 8/8: Clarification - In the sequential case, the BravoTask should not execute at all if AlphaTask is not successful or if the timeout has already occurred.
Assume both AlphaTask and BravoTask cannot block. Not that it matters, but in my real-world scenario they are actually async WCF service calls.
Maybe there is an aspect of Rx I could have exploited to clean up the sequential version. But even just Task programming by itself should have a better story I'd imagine. We'll see.
ERRATA In both code samples I changed the return type to Task, as poster answers were quite correct that I should not have been returning a TaskCompletionSource.
If you can use async/await, Brandon has a nice answer. If you are still on VS2010, the first thing I would do to clean up the sequential version is to get an extension method like the Then
method Stephen Toub described in a blog post. I would also implement a Task.FromResult
method if you are not using .NET 4.5. With those, you could get:
public Task<string> DoWorkInSequence()
{
return Task.FromResult(4)
.Then(x =>
{ if (x != 5)
{
return Task.FromResult(true)
.Then(y =>
{ if (y)
{
return Task.FromResult(x.ToString() + y.ToString());
}
else
{
return Task.FromResult("Nothing");
}
});
}
else
{
return Task.FromResult("Nothing");
}
});
}
Also, you should generally return a Task instead of a TaskCompletionSource (which you can get by calling .Task
on the TaskCompletionSource), since you don't want the caller to set a result on the task you are returning to them.
Brandon's answer also gives a good way to implement the timeout functionality (adjusting for the lack of async/await keywords).
EDIT To reduce arrow code, we can implement more of the LINQ methods. A SelectMany implementation is provided in the previously linked blog post. The other methods we will need for LINQ are Select and Where. These should be fairly straightforward once you've done Then and SelectMany, but here they are:
public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate)
{
if (task == null) throw new ArgumentNullException("task");
if (predicate == null) throw new ArgumentNullException("predicate");
var tcs = new TaskCompletionSource<T>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetCanceled();
else
{
try
{
if (predicate(completed.Result))
tcs.TrySetResult(completed.Result);
else
tcs.TrySetCanceled();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
});
return tcs.Task;
}
public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector)
{
if (task == null) throw new ArgumentNullException("task");
if (selector == null) throw new ArgumentNullException("selector");
var tcs = new TaskCompletionSource<TResult>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetCanceled();
else
{
try
{
tcs.TrySetResult(selector(completed.Result));
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
});
return tcs.Task;
}
After that, one final non-LINQ extension method allows use to return a default value when cancelled:
public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue)
{
if (task == null) throw new ArgumentNullException("task");
var tcs = new TaskCompletionSource<T>();
task.ContinueWith((completed) =>
{
if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled) tcs.TrySetResult(defaultValue);
else tcs.TrySetResult(completed.Result);
});
return tcs.Task;
}
And the new and improved DoWork (sans timeout):
public static Task<string> DoWorkInSequence()
{
return (from x in Task_FromResult(5)
where x != 5
from y in Task_FromResult(true)
where y
select x.ToString() + y.ToString()
).IfCanceled("Nothing");
}
The Timeout method from Brandon's answer (once rewritten, if needed without async/await) can be stuck on the end of the chain for an overall timeout and/or after each step in the chain if you want to keep further steps from running once the overall timeout is reached. Another possibility for the chain interruption would be to make all the individual steps take a cancellation token and modify the Timeout method to take the CancellationTokenSource and cancel it if a timeout occurs, as well as throwing the timeout exception.
EDIT (Brent Arias)
Taking fantastic ideas from what you have presented, I've devised what I think is the final answer from my POV. It is based on the .net 4.0 extension methods found in the nuget package of ParallelExtensionsExtras. The sample below adds a third task, to help illustrate the "feel" of programming for sequential tasks, given my stated requirements:
public Task<string> DoWorkInSequence()
{
var cts = new CancellationTokenSource();
Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); });
Task<int> AlphaTask = Task.Factory
.StartNew(() => 4 )
.Where(x => x != 5 && !cts.IsCancellationRequested);
Task<bool> BravoTask = AlphaTask
.Then(x => true)
.Where(x => x && !cts.IsCancellationRequested);
Task<int> DeltaTask = BravoTask
.Then(x => 7)
.Where(x => x != 8);
Task<string> final = Task.Factory
.WhenAny(DeltaTask, timer)
.ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion
? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing");
//This is here just for experimentation. Placing it at different points
//above will have varying effects on what tasks were cancelled at a given point in time.
cts.Cancel();
return final;
}
There are a few key observations I've made in this discussion and joint effort:
.ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default)
. When replacing "Then" with "ContinueWith" for my stated scenario, it is critical to add the OnlyOnRanToCompletion
option.StartNewDelayed(...)
tactic and added an express cancellation check in each Where
clause.public Task<string> DoWorkInSequence()
{
Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
Func<int> BravoFunc = x => 2 * x;
//Prepare for Rx, and set filters to allow 'Zip' to terminate early
//in some cases.
IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
return AsyncAlpha
.Do(x => Console.WriteLine(x)) //This is how you "Do WORK in sequence"
.Select(BravoFunc) //This is how you map results from Alpha
//via a second method.
.Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
(x) => { result.TrySetResult(x); },
(x) => { result.TrySetException(x.GetBaseException()); },
() => { result.TrySetResult("Nothing"); }).ToTask();
}
Ultimately however I would actually just do it all in TPL if you want Tasks, or use Observable.ToTask(this IObservable<T> observable)
as opposed to using TaskCompletionSource
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With