In the code below I want to syncronize the reporting of the results of a list of tasks. This is working now because task.Result blocks until the task completes. However, task id = 3 takes a long time to complete and blocks all of the other finished tasks from reporting their status.
I think that I can do this by moving the reporting (Console.Write) into a .ContinueWith instruction but I don't have a UI thread so how do I get a TaskScheduler to syncronize the .ContinueWith tasks?
What I have now:
static void Main(string[] args)
{
Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);
var tasks = new List<Task<int>>();
for (var i = 0; i < 10; i++)
{
var num = i;
var t = Task<int>.Factory.StartNew(() =>
{
if (num == 3)
{
Thread.Sleep(20000);
}
Thread.Sleep(new Random(num).Next(1000, 5000));
Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
return num;
});
tasks.Add(t);
}
foreach (var task in tasks)
{
Console.WriteLine("Completed {0} on {1}", task.Result, Thread.CurrentThread.ManagedThreadId);
}
Console.WriteLine("End of Main");
Console.ReadKey();
}
I would like to move to this or something similar but I need the Console.Write("Completed...") to all happen on the same thread:
static void Main(string[] args)
{
Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);
for (var i = 0; i < 10; i++)
{
var num = i;
Task<int>.Factory.StartNew(() =>
{
if (num == 3)
{
Thread.Sleep(20000);
}
Thread.Sleep(new Random(num).Next(1000, 10000));
Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
return num;
}).ContinueWith(value =>
{
Console.WriteLine("Completed {0} on {1}", value.Result, Thread.CurrentThread.ManagedThreadId);
}
/* need syncronization context */);
}
Console.WriteLine("End of Main");
Console.ReadKey();
}
-- SOLUTION -- After getting some comments and reading some of the solutions this is the complete solution that does what I want. The goal here is to process severl long running tasks as fast as possible and then do something with the results of each task one at a time.
static void Main(string[] args)
{
Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);
var results = new BlockingCollection<int>();
Task.Factory.StartNew(() =>
{
while (!results.IsCompleted)
{
try
{
var x = results.Take();
Console.WriteLine("Completed {0} on {1}", x, Thread.CurrentThread.ManagedThreadId);
}
catch (InvalidOperationException)
{
}
}
Console.WriteLine("\r\nNo more items to take.");
});
var tasks = new List<Task>();
for (var i = 0; i < 10; i++)
{
var num = i;
var t = Task.Factory.StartNew(() =>
{
if (num == 3)
{
Thread.Sleep(20000);
}
Thread.Sleep(new Random(num).Next(1000, 10000));
Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
results.Add(num);
});
tasks.Add(t);
}
Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => results.CompleteAdding());
Console.WriteLine("End of Main");
Console.ReadKey();
}
You'll have to create a writer task of some sort, however, keep in mind even this task can be rescheduled onto another native or managed thread! Using the default scheduler in TPL you have no control over which managed thread receives the work.
public class ConcurrentConsole
{
private static BlockingCollection<string> output
= new BlockingCollection<string>();
public static Task CreateWriterTask(CancellationToken token)
{
return new Task(
() =>
{
while (!token.IsCancellationRequested)
{
string nextLine = output.Take(token);
Console.WriteLine(nextLine);
}
},
token);
}
public static void WriteLine(Func<string> writeLine)
{
output.Add(writeLine());
}
}
When I switched your code to use this I received the following output:
End of Main
Done 1 on 6
Completed 1 on 6
Done 5 on 9
Completed 5 on 9
Done 0 on 4
Completed 0 on 4
Done 2 on 5
Completed 2 on 13
Done 7 on 10
Completed 7 on 10
Done 4 on 8
Completed 4 on 5
Done 9 on 12
Completed 9 on 9
Done 6 on 6
Completed 6 on 5
Done 8 on 11
Completed 8 on 4
Done 3 on 7
Completed 3 on 7
Even with your code sending () => String.Format("Completed {0} on {1}"...
to ConcurrentConsole.WriteLine
, ensuring the ManagedThreadId
would be picked up on the ConcurrentConsole
Task, it still would alter which thread it ran on. Although with less variability than the executing tasks.
You can use OrderedTaskScheduler
to ensure only one task completion is run at a time; however, they will run on a threadpool thread (not necessarily all on the same thread).
If you really need them all on the same thread (not just one at a time), then you can use ActionThread
from the Nito.Async library. It provides a SynchronizationContext
for its code, which can be picked up by FromCurrentSynchronizationContext
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With