Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queue calls to an async method

I have an async operation named Refresh. If a second call to refresh is made before the first is finished I need to queue it. This is what I have:

public async Task Refresh(RefreshArgs refreshArgs)
{
    await EnqueueRefreshTask(refreshArgs);
}

private Queue<RefreshArgs> refreshQueue =
    new Queue<RefreshArgs>();

private async Task EnqueueRefreshTask(RefreshArgs refreshArgs)
{
    refreshQueue.Enqueue(refreshArgs);

    await ProcessRefreshQueue();
}

private Task currentRefreshTask = null;

private async Task ProcessRefreshQueue()
{
    if ((currentRefreshTask == null) || (currentRefreshTask.IsCompleted))
    {
        if (refreshQueue.Count > 0)
        {
            var refreshArgs = refreshQueue.Dequeue();

            currentRefreshTask = DoRefresh(refreshArgs);
            await currentRefreshTask;

            await ProcessRefreshQueue();
        }           
    }
}

private async Task DoRefresh(RefreshArgs refreshArgs)
{
    // Lots of code here, including calls to a server that are executed with await.
    // Code outside my control may make another Refresh(args) call while this one is still processing.
    // I need this one to finish before processing the next.
}

It works, but I'm not sure it's the best way to do this with Tasks. Any thoughts?

Update:

I tried using ActionBlock:

public async Task Refresh(RefreshArgs refreshArgs)
{
    if (refreshActionBlock == null)
    {
        var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions();
        executionDataflowBlockOptions.MaxMessagesPerTask = 1;
        executionDataflowBlockOptions.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

        refreshActionBlock = new ActionBlock<RefreshArgs>(args => DoRefresh(args), executionDataflowBlockOptions);
    }

    await refreshActionBlock.SendAsync(refreshArgs);
}

This queues DoRefresh, and allows it to run in the UI thread (which I need). Problem is SendAsync doesn't await on the work of DoRefresh.

SendAsync: "Asynchronously offers a message to the target message block, allowing for postponement". I'm only awaiting on the send, not the action its-self.

Doing this doesn't work as expected:

await Refresh(RefreshArgs.Something);
// other code goes here. It expects the first refresh to be finished.
await Refresh(RefreshArgs.SomethingElse);
// other code goes here. It expects the second refresh to be finished.

The ActionBlock will queue the second refresh, but the awaits fall through before the refresh is done. I need them to return when the work of DoRefresh is done.

like image 713
mbursill Avatar asked Apr 10 '13 15:04

mbursill


2 Answers

I think the simplest way to do this is to use an AsyncLock. You can get one from Stephen Cleary's library AsyncEx, or you can read Stephen Toub's article about how to build it yourself.

When you have AsyncLock, implementing your Refresh() is straightforward:

public async Task Refresh(RefreshArgs refreshArgs)
{
    using (await m_lock.LockAsync())
    {
        // do your async work here
    }
}

This will make sure the Refresh()es execute one after the other (and not interleaved) and also that the Task returned from Refresh() completes only after the Refresh() is actually done.

You could use ActionBlock from TPL Dataflow to do the same, but you would also need to use TaskCompletionSource and it would be much more complicated than the AsyncLock version.

like image 75
svick Avatar answered Oct 14 '22 07:10

svick


you can take a look at these posts, they talk about asynchronous coordination (semaphores, reset events) and exclusion using tasks:

[Edit: add part 6]

  • Building Async Coordination Primitives, Part 6: AsyncLock
like image 45
polkduran Avatar answered Oct 14 '22 06:10

polkduran