Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Task synchronization without a UI thread

In the code below I want to syncronize the reporting of the results of a list of tasks. This is working now because task.Result blocks until the task completes. However, task id = 3 takes a long time to complete and blocks all of the other finished tasks from reporting their status.

I think that I can do this by moving the reporting (Console.Write) into a .ContinueWith instruction but I don't have a UI thread so how do I get a TaskScheduler to syncronize the .ContinueWith tasks?

What I have now:

static void Main(string[] args)
{
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);

    var tasks = new List<Task<int>>();

    for (var i = 0; i < 10; i++)
    {
        var num = i;
        var t = Task<int>.Factory.StartNew(() =>
        {
           if (num == 3)
           {
               Thread.Sleep(20000);
           }
           Thread.Sleep(new Random(num).Next(1000, 5000));
           Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
           return num;
        });
        tasks.Add(t);
    }

    foreach (var task in tasks)
    {
        Console.WriteLine("Completed {0} on {1}", task.Result, Thread.CurrentThread.ManagedThreadId);
    }

    Console.WriteLine("End of Main");
    Console.ReadKey();
}

I would like to move to this or something similar but I need the Console.Write("Completed...") to all happen on the same thread:

static void Main(string[] args)
{
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);

    for (var i = 0; i < 10; i++)
    {
        var num = i;
        Task<int>.Factory.StartNew(() =>
        {
           if (num == 3)
           {
               Thread.Sleep(20000);
           }
           Thread.Sleep(new Random(num).Next(1000, 10000));
           Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
           return num;
       }).ContinueWith(value =>
       {
           Console.WriteLine("Completed {0} on {1}", value.Result, Thread.CurrentThread.ManagedThreadId);
       } 

     /* need syncronization context */);
    }

    Console.WriteLine("End of Main");
    Console.ReadKey();
}

-- SOLUTION -- After getting some comments and reading some of the solutions this is the complete solution that does what I want. The goal here is to process severl long running tasks as fast as possible and then do something with the results of each task one at a time.

static void Main(string[] args)
{
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);

    var results = new BlockingCollection<int>();

    Task.Factory.StartNew(() =>
    {
        while (!results.IsCompleted)
        {
            try
            {
                var x = results.Take();
                Console.WriteLine("Completed {0} on {1}", x, Thread.CurrentThread.ManagedThreadId);
            }
            catch (InvalidOperationException)
            {
            }
        }
        Console.WriteLine("\r\nNo more items to take.");
    });

    var tasks = new List<Task>();

    for (var i = 0; i < 10; i++)
    {
        var num = i;
        var t = Task.Factory.StartNew(() =>
        {
            if (num == 3)
            {
                Thread.Sleep(20000);
            }
            Thread.Sleep(new Random(num).Next(1000, 10000));
            Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
            results.Add(num);
        });

        tasks.Add(t);
    }

    Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => results.CompleteAdding());

    Console.WriteLine("End of Main");
    Console.ReadKey();
}
like image 718
Ryan Pedersen Avatar asked Jul 01 '11 16:07

Ryan Pedersen


2 Answers

You'll have to create a writer task of some sort, however, keep in mind even this task can be rescheduled onto another native or managed thread! Using the default scheduler in TPL you have no control over which managed thread receives the work.

public class ConcurrentConsole
{
    private static BlockingCollection<string> output
        = new BlockingCollection<string>();

    public static Task CreateWriterTask(CancellationToken token)
    {
        return new Task(
            () =>
            {
                while (!token.IsCancellationRequested)
                {
                    string nextLine = output.Take(token);
                    Console.WriteLine(nextLine);
                }
            },
            token);
    }

    public static void WriteLine(Func<string> writeLine)
    {
        output.Add(writeLine());
    }
}

When I switched your code to use this I received the following output:

End of Main
Done 1 on 6
Completed 1 on 6
Done 5 on 9
Completed 5 on 9
Done 0 on 4
Completed 0 on 4
Done 2 on 5
Completed 2 on 13
Done 7 on 10
Completed 7 on 10
Done 4 on 8
Completed 4 on 5
Done 9 on 12
Completed 9 on 9
Done 6 on 6
Completed 6 on 5
Done 8 on 11
Completed 8 on 4
Done 3 on 7
Completed 3 on 7

Even with your code sending () => String.Format("Completed {0} on {1}"... to ConcurrentConsole.WriteLine, ensuring the ManagedThreadId would be picked up on the ConcurrentConsole Task, it still would alter which thread it ran on. Although with less variability than the executing tasks.

like image 54
user7116 Avatar answered Oct 09 '22 08:10

user7116


You can use OrderedTaskScheduler to ensure only one task completion is run at a time; however, they will run on a threadpool thread (not necessarily all on the same thread).

If you really need them all on the same thread (not just one at a time), then you can use ActionThread from the Nito.Async library. It provides a SynchronizationContext for its code, which can be picked up by FromCurrentSynchronizationContext.

like image 36
Stephen Cleary Avatar answered Oct 09 '22 09:10

Stephen Cleary