It's not possible to await a List<Task> that's changing because Task.WhenAny(List<Task>) takes a copy of the List<Task>.
What's an appropriate pattern for
List<Task> taskList = new List<Task>();
await Task.WhenAny(taskList);
When taskList could have other tasks added to it after the first WhenAny is called?
Full demo code below demonstrating the issue.
static readonly List<Task<int>> taskList = new List<Task<int>>();
static readonly Random rnd = new Random(1);
static async Task<int> RunTaskAsync(int taskID,int taskDuration)
{
await Task.Yield();
Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
await Task.Delay(taskDuration); // mimic some work
return taskID;
}
static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
{
// Add numTasks asyncronously to the taskList
// First task is added Syncronously and then we yield the adds to a worker
taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task
// remaing task run's are Yielded to a worker thread
for (int i = 2; i <= numTasks; i++)
{
await Task.Delay(rnd.Next(minDelay, maxDelay));
taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
}
}
static async Task Main(string[] args)
{
Stopwatch sw = new Stopwatch(); sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
// while there are tasks to complete use the main thread to process them as they comeplete
while(taskList.Count > 0)
{
var t = await Task.WhenAny(taskList);
taskList.Remove(t);
var i = await t;
Console.WriteLine("Task {0} found to be completed at: {1}",i,sw.Elapsed);
}
// All tasks have completed sucessfully - exit main thread
}
Console Output, showing that the WhenAny() loop found all the other tasks completed, only after finding and removing the 60 second task.
Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Starting Task: 3 with a duration of 24 seconds
Starting Task: 4 with a duration of 15 seconds
Starting Task: 5 with a duration of 28 seconds
Starting Task: 6 with a duration of 21 seconds
Starting Task: 7 with a duration of 11 seconds
Starting Task: 8 with a duration of 29 seconds
Starting Task: 9 with a duration of 21 seconds
Starting Task: 10 with a duration of 20 seconds
Task 1 found to be completed at: 00:01:00.1305811
Task 2 found to be completed at: 00:01:00.1312951
Task 3 found to be completed at: 00:01:00.1315689
Task 4 found to be completed at: 00:01:00.1317623
Task 5 found to be completed at: 00:01:00.1319427
Task 6 found to be completed at: 00:01:00.1321225
Task 7 found to be completed at: 00:01:00.1323002
Task 8 found to be completed at: 00:01:00.1324379
Task 9 found to be completed at: 00:01:00.1325962
Task 10 found to be completed at: 00:01:00.1327377
Thanks!
There's a problem with the code you've shown, namely it doesn't have a sensible communication pipeline between the worker and the task creator. You need some kind of a messaging mechanism to notify the worker about new tasks (and when there are no more tasks) so that it can react to it. That is something you have to figure out for your concurrent system and the exact implementation is tangential to the question, so I'll just assume we have OnTaskAdded(Task task) and OnEnd() methods in our worker.
From what you're saying, you don't want to really wait until any task completes, but rather for each task execute something when it completes. SEE UPDATED ANSWER BELOW.
That can be achieved with ContinueWith:
class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();
// Start the stopwatch in the constructor or in some kind of a StartProcessing method.
void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = task.ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, _stopwatch.Elapsed));
_tasks.Add(taskWithContinuation);
}
async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
}
EDIT: After all that moralizing talk about ensuring a sensible messaging pipeline, I reckoned I can actually give you a quick-and-dirty implementation just so you can see it works:
// DISCLAIMER: NOT PRODUCTION CODE!!!
public static async Task Main()
{
Stopwatch sw = new Stopwatch(); sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();
// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
var taskWithContinuation = taskList[i].ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, sw.Elapsed));
internalList.Add(taskWithContinuation);
++i;
}
await Task.WhenAll(internalList);
}
Let me stress that again: this is not production-quality code! Actively waiting for more tasks, ugh. Its output is something like this:
Task 1 intercepted at: 00:00:00.0495570
Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8459622
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2626124
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2257285
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3058738
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6376981
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7507146
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7107754
Task 2 found to be completed at: 00:00:12.8111589
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7883430
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.6707959
Task 7 found to be completed at: 00:00:21.6692276
Task 4 found to be completed at: 00:00:24.2125638
Task 3 found to be completed at: 00:00:31.2276640
Task 6 found to be completed at: 00:00:31.5908324
Task 10 found to be completed at: 00:00:34.5585143
Task 9 found to be completed at: 00:00:34.7053864
Task 5 found to be completed at: 00:00:38.2616534
Task 8 found to be completed at: 00:00:40.6372696
Task 1 found to be completed at: 00:01:00.0720695
You can see that lines are a little bit shuffled due to the nature of multithreaded work, but the timestamps are accurate.
Well, I'm pretty dumb, I've just invited you into an anti-pattern. Using ContinueWith is dangerous, moreover it's overcomplicated - async/await was introduced to free us of manually scheduling continuations. You can just wrap your Task<int> with an operation that awaits it and logs the time.
class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();
// Start the stopwatch in the constructor or in some kind of a StartProcessing method.
void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = ContinueWithLog(task);
_tasks.Add(taskWithContinuation);
}
async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
private Task ContinueWithLog(Task<int> task)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
}
Using your example code for a quick-and-dirty PoC:
class Program
{
static readonly List<Task<int>> taskList = new List<Task<int>>();
static readonly Random rnd = new Random(1);
static readonly Stopwatch sw = new Stopwatch();
static async Task<int> RunTaskAsync(int taskID, int taskDuration)
{
await Task.Yield();
Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
await Task.Delay(taskDuration); // mimic some work
return taskID;
}
static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
{
// Add numTasks asyncronously to the taskList
// First task is added Syncronously and then we yield the adds to a worker
taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task
// remaing task run's are Yielded to a worker thread
for (int i = 2; i <= numTasks; i++)
{
await Task.Delay(rnd.Next(minDelay, maxDelay));
taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
}
}
public static async Task ContinueWithLog(Task<int> source)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
public static async Task Main()
{
sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();
// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
internalList.Add(ContinueWithLog(taskList[i]));
++i;
}
await Task.WhenAll(internalList);
}
}
Output:
Starting Task: 1 with a duration of 60 seconds
Task 1 intercepted at: 00:00:00.0525006
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8551382
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2687049
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2404507
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3325019
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6654663
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7809841
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7576237
Task 2 found to be completed at: 00:00:12.8151955
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7228579
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.5829039
Task 7 found to be completed at: 00:00:21.6848699
Task 4 found to be completed at: 00:00:24.2089671
Task 3 found to be completed at: 00:00:31.2300136
Task 6 found to be completed at: 00:00:31.5847257
Task 10 found to be completed at: 00:00:34.5550722
Task 9 found to be completed at: 00:00:34.6904076
Task 5 found to be completed at: 00:00:38.2835777
Task 8 found to be completed at: 00:00:40.6445029
Task 1 found to be completed at: 00:01:00.0826952
This is the idiomatic way to achieve what you want. I'm sorry for misleading you with ContinueWith first, it's unnecessary and error-prone, now we both know.
A List<Task> is not a suitable container for this kind of job because it does not support the notion of Completion. So you won't be able to determine if there are more tasks to be added in the list, so that you can stop waiting. There are multiple alternatives though.
BlockingCollection<Task>. The producer calls the methods Add and finally CompleteAdding, to signal that has finished adding tasks. The consumer just enumerates the GetConsumingEnumerable. Very simple, but blocking by nature (not async).BufferBlock<Task>. The producer calls the methods SendAsync and finally Complete, to signal that has finished adding tasks. The consumer enumerates asynchronously using the methods OutputAvailableAsync and TryReceive. Requires the package TPL Dataflow (for .NET Framework, it's included in .NET Core).Channel<Task>. The producer calls the methods Writer.WriteAsync and finally Writer.Complete to signal that has finished adding tasks. The consumer enumerates asynchronously using the methods Reader.WaitToReadAsync and Reader.TryRead. Requires the package System.Threading.Channels (for .NET Framework, it's included in .NET Core).IObservable<Task> + IObserver<Task> pair. The observer subscribes to the observable, and then starts receiving notifications about new tasks. The last notification is the onCompleted(), that signals that no more notifications are going to be produced. The Reactive Extensions library includes a tone of methods for manipulating observables, and one of them is the Merge operator, that can be used for awaiting all the tasks, exploiting the fact that a Task<T> can be transformed to an IObservable<T> that produces a singe onNext notification. This approach may seem quite eccentric, and it probably doesn't worth the investment of learning this technology (the reactive programming paradigm), unless you are dealing frequently with incoming streams of data that you would like to filter, transform, combine etc.Update: In retrospect the first three options cannot be used as is, because you also want to await the tasks. So my suggestion now is to use a TransformBlock<Task, Task> instead of a BufferBlock<Task>.
var block = new TransformBlock<Task, Task>(async task =>
{
try
{
await task;
}
catch { } // suppress exceptions
return task;
});
Example of a producer that adds tasks to the block:
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(100);
Console.WriteLine($"Sending {i}");
await block.SendAsync(Task.Delay(i * 100));
}
block.Complete();
});
Example of a consumer that receives the completed tasks from the block:
var consumer = Task.Run(async () =>
{
while (await block.OutputAvailableAsync())
{
while (block.TryReceive(out var task))
{
Console.WriteLine($"Task Completed: {task.Status}");
}
}
});
The tasks are received in the same order they were added in the block. If you want to receive them as soon as they are completed, configure the block like this:
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = Int32.MaxValue,
EnsureOrdered = false
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With