Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Representing asynchronous sequences in C# 5

How should you use C# 5's async to represent a sequence of asynchronous tasks? For example, if we wanted to download numbered files from a server and return each one as we get it, how can we implement a method such as this?

public async IEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    for (int i = 0; i++; ) {
        yield return await DownloadFile(string.Format(format, i));
    }
}
like image 646
configurator Avatar asked Oct 03 '11 19:10

configurator


3 Answers

It seems to me you want something very similar to BlockingCollection<T>, that uses Tasks and awaiting instead of blocking.

Specifically, something that you can add to without blocking or waiting. But when you try to remove an item when none is available at the moment, you can await until some item is available.

The public interface could look like this:

public class AsyncQueue<T>
{
    public bool IsCompleted { get; }

    public Task<T> DequeueAsync();

    public void Enqueue(T item);

    public void FinishAdding();
}

FinishAdding() is necessary, so that we know when to end dequeuing.

With this, your code could look like this (m_queue is AsyncQueue<File>):

var tasks = Enumerable.Range(0, 10)
    .Select(i => DownloadAndEnqueue(i))
    .ToArray();

Task.WhenAll(tasks).ContinueWith(t => m_queue.FinishAdding());

…

static async Task DownloadAndEnqueue(string url)
{
    m_queue.Enqueue(await DownloadFile(url));
}

It's not as nice as what you imagined could work, but it should work.

And the implementation of AsyncQueue<T>? There are two queues. One is for completed work, that hasn't been dequeued yet. The other is for Tasks (actually, TaskCompletionSource<T>) that were already dequeued, but that don't have any result yet.

When you dequeue and there is some completed work in the queue, just return work from there (using Task.FromResult()). If the queue is empty, create new Task, add it to the other queue and return it.

When you enqueue some completed work and there are some Tasks in the queue, remove one and finish it using the result we have now. If the Task queue is empty, add the work to the first queue.

With this, you can dequeue and enqueue as many times as you want, and it will work correctly. When you know there won't be any new work, call FinishAdding(). If there are any waiting Tasks, they will throw an exception.

In other words:

public class AsyncQueue<T>
{
    private readonly object m_lock = new object();

    private bool m_finishedAdding = false;

    private readonly Queue<T> m_overflowQueue = new Queue<T>();

    private readonly Queue<TaskCompletionSource<T>> m_underflowQueue =
        new Queue<TaskCompletionSource<T>>();

    public bool IsCompleted
    {
        get { return m_finishedAdding && m_overflowQueue.Count == 0; }
    }

    public Task<T> DequeueAsync()
    {
        Task<T> result;
        lock (m_lock)
        {
            if (m_overflowQueue.Count > 0)
                result = Task.FromResult(m_overflowQueue.Dequeue());
            else if (!m_finishedAdding)
            {
                var tcs = new TaskCompletionSource<T>();
                m_underflowQueue.Enqueue(tcs);
                result = tcs.Task;
            }
            else
                throw new InvalidOperationException();
        }
        return result;
    }

    public void Enqueue(T item)
    {
        lock (m_lock)
        {
            if (m_finishedAdding)
                throw new InvalidOperationException();

            if (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetResult(item);
            }
            else
                m_overflowQueue.Enqueue(item);
        }
    }

    public void FinishAdding()
    {
        lock (m_lock)
        {
            m_finishedAdding = true;

            while (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetException(new InvalidOperationException());
            }
        }
    }
}

If you wanted to limit size of the work queue (and thus limiting producers, if they are too fast), you could make Enqueue() return Task too, which would require another queue.

like image 164
svick Avatar answered Oct 11 '22 13:10

svick


A true sequence doesn't work well directly with async/await, because tasks only return a single value. You need an actual enumerable type, such as IAsyncEnumerator<T> in Ix-Async (or AsyncEx). The design of IAsyncEnumerator<T> is described in this Channel9 video.

like image 20
Stephen Cleary Avatar answered Oct 11 '22 14:10

Stephen Cleary


I know it's been a while, but I've written something to closely emulate "yield return" for async enumerables here. No complicated code needed.

You use it like:

public IAsyncEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    return AsyncEnumerable.Create(async y =>
    {
        for (int i = 0; i++; ) {
            await y.YieldReturn(await DownloadFile(string.Format(format, i)));
        }
    };
}

I normally avoid advertising my own code here but this is a clearly needed feature in C# 6.0, so I hope you find it useful in C# 5.0 if you're still stuck on this.

like image 3
Cory Nelson Avatar answered Oct 11 '22 14:10

Cory Nelson