Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

awaitable Task based queue

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

Here's my implementation:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}
like image 650
spender Avatar asked Oct 23 '11 00:10

spender


4 Answers

I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T> should suffice, e.g.:

BufferBlock<int> buffer = new BufferBlock<int>(); 

Production and consumption are most easily done via extension methods on the dataflow block types.

Production is as simple as:

buffer.Post(13); 

and consumption is async-ready:

int item = await buffer.ReceiveAsync(); 

I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.

like image 85
Stephen Cleary Avatar answered Oct 02 '22 09:10

Stephen Cleary


Simple approach with C# 8.0 IAsyncEnumerable and Dataflow library

// Instatiate an async queue var queue = new AsyncQueue<int>();  // Then, loop through the elements of queue. // This loop won't stop until it is canceled or broken out of // (for that, use queue.WithCancellation(..) or break;) await foreach(int i in queue) {     // Writes a line as soon as some other Task calls queue.Enqueue(..)     Console.WriteLine(i); } 

With an implementation of AsyncQueue as follows:

public class AsyncQueue<T> : IAsyncEnumerable<T> {     private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);     private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();      public void Enqueue(T item) =>         _bufferBlock.Post(item);      public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)     {         // We lock this so we only ever enumerate once at a time.         // That way we ensure all items are returned in a continuous         // fashion with no 'holes' in the data when two foreach compete.         await _enumerationSemaphore.WaitAsync();         try {             // Return new elements until cancellationToken is triggered.             while (true) {                 // Make sure to throw on cancellation so the Task will transfer into a canceled state                 token.ThrowIfCancellationRequested();                 yield return await _bufferBlock.ReceiveAsync(token);             }         } finally {             _enumerationSemaphore.Release();         }      } } 
like image 21
Bruno Zell Avatar answered Oct 02 '22 07:10

Bruno Zell


There is an official way to do this now: System.Threading.Channels. It's built into the core runtime on .NET Core 3.0 and higher (including .NET 5.0 and 6.0), but it's also available as a NuGet package on .NET Standard 2.0 and 2.1. You can read through the docs here.

var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();

To enqueue work:

// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);

To complete the channel:

channel.Writer.TryComplete();

To read from the channel:

var i = await channel.Reader.ReadAsync();

Or, if you have .NET Core 3.0 or higher:

await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}
like image 33
kanders84152 Avatar answered Oct 02 '22 07:10

kanders84152


One simple and easy way to implement this is with a SemaphoreSlim:

public class AwaitableQueue<T>
{
    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();

    public void Enqueue(T item)
    {
        lock (queueLock)
        {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }

    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }

    public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
}

The beauty of this is that the SemaphoreSlim handles all of the complexity of implementing the Wait() and WaitAsync() functionality. The downside is that queue length is tracked by both the semaphore and the queue itself, and they both magically stay in sync.

like image 40
Ryan Avatar answered Oct 02 '22 09:10

Ryan