Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to have mutliple threads await a single Task?

I've read this: Is it ok to await the same task from multiple threads - is await thread safe? and I don't feel clear about the answer, so here's a specific use case.

I have a method that performs some async network I/O. Multiple threads can hit this method at once, and I dont wan't them all to invoke a network request, If a request is already in progress I want to block/await the 2nd+ threads, and have them all resume once the single IO operation has completed.

How should I write the following pseudcode? I'm guessing each calling thread really needs to get its own Task, so each can get it's own continuation, so instead of returning currentTask I should return a new Task which is completed by the "inner" Task from DoAsyncNetworkIO. Is there a clean way to do this, or do I have to hand roll it?

static object mutex = new object();
static Task currentTask;

async Task Fetch()
{
    lock(mutex)
    {
        if(currentTask != null)
            return currentTask;
    }

    currentTask = DoAsyncNetworkIO();
    await currentTask;

    lock(mutex)
    {
        var task = currentTask;
        currentTask = null;
        return task;
    }
}
like image 203
Andrew Bullock Avatar asked Jan 25 '16 09:01

Andrew Bullock


2 Answers

You could use a SemaphoreSlim to ensure that only one thread actually executes the background thread.

Assume your base task (the one actually doing the IO) is in a method called baseTask(), which I shall emulate like so:

static async Task baseTask()
{
    Console.WriteLine("Starting long method.");
    await Task.Delay(1000);
    Console.WriteLine("Finished long method.");
}

Then you can initialise a SemaphoreSlim like so, to act a bit like an AutoResetEvent with initial state set to true:

static readonly SemaphoreSlim signal = new SemaphoreSlim(1, 1);

Then wrap the call to baseTask() in a method that checks signal to see if this is the first thread to try to run baseTask(), like so:

static async Task<bool> taskWrapper()
{
    bool firstIn = await signal.WaitAsync(0);

    if (firstIn)
    {
        await baseTask();
        signal.Release();
    }
    else
    {
        await signal.WaitAsync();
        signal.Release();
    }

    return firstIn;
}

Then your multiple threads would await taskWrapper() rather than awaiting baseTask() directly.

Putting that altogether in a compilable console application:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            for (int it = 0; it < 10; ++it)
            {
                Console.WriteLine($"\nStarting iteration {it}");
                Task[] tasks = new Task[5];

                for (int i = 0; i < 5; ++i)
                    tasks[i] = Task.Run(demoTask);

                Task.WaitAll(tasks);
            }

            Console.WriteLine("\nFinished");                  
            Console.ReadLine();
        }

        static async Task demoTask()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine($"Thread {id} starting");

            bool firstIn = await taskWrapper();

            Console.WriteLine($"Task {id}: executed: {firstIn}");
        }

        static async Task<bool> taskWrapper()
        {
            bool firstIn = await signal.WaitAsync(0);

            if (firstIn)
            {
                await baseTask();
                signal.Release();
            }
            else
            {
                await signal.WaitAsync();
                signal.Release();
            }

            return firstIn;
        }

        static async Task baseTask()
        {
            Console.WriteLine("Starting long method.");
            await Task.Delay(1000);
            Console.WriteLine("Finished long method.");
        }

        static readonly SemaphoreSlim signal = new SemaphoreSlim(1, 1);
    }
}

(The methods are all static because they are in a console app; in real code they would be non-static methods.)

like image 153
Matthew Watson Avatar answered Nov 07 '22 10:11

Matthew Watson


await doesn't necessarily use continuations (the Task.ContinueWith kind) at all. Even when it does, you can have multiple continuations on one Task - they just can't all run synchronously (and you might run into some issues if you have a synchronization context).

Do note that your pseudo-code isn't thread-safe, though - you can't just do currentTask = DoAsyncNetworkIO(); outside of a lock. Only the await itself is thread-safe, and even then, only because the Task class that you're awaiting implements the await contract in a thread-safe way. Anyone can write their own awaiter/awaitable, so make sure to pay attention :)

like image 2
Luaan Avatar answered Nov 07 '22 12:11

Luaan