Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Preventing task from running on certain thread

I have been struggling a bit with some async await stuff. I am using RabbitMQ for sending/receiving messages between some programs.

As a bit of background, the RabbitMQ client uses 3 or so threads that I can see: A connection thread and two heartbeat threads. Whenever a message is received via TCP, the connection thread handles it and calls a callback which I have supplied via an interface. The documentation says that it is best to avoid doing lots of work during this call since its done on the same thread as the connection and things need to continue on. They supply a QueueingBasicConsumer which has a blocking 'Dequeue' method which is used to wait for a message to be received.

I wanted my consumers to be able to actually release their thread context during this waiting time so somebody else could do some work, so I decided to use async/await tasks. I wrote an AwaitableBasicConsumer class which uses TaskCompletionSources in the following fashion:

I have an awaitable Dequeue method:

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    //we are enqueueing a TCS. This is a "read"
    rwLock.EnterReadLock();

    try
    {
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

        //if we are cancelled before we finish, this will cause the tcs to become cancelled
        cancellationToken.Register(() =>
        {
            tcs.TrySetCanceled();
        });

        //if there is something in the undelivered queue, the task will be immediately completed
        //otherwise, we queue the task into deliveryTCS
        if (!TryDeliverUndelivered(tcs))
            deliveryTCS.Enqueue(tcs);
        }

        return tcs.Task;
    }
    finally
    {
        rwLock.ExitReadLock();
    }
}

The callback which the rabbitmq client calls fulfills the tasks: This is called from the context of the AMQP Connection thread

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    //we want nothing added while we remove. We also block until everybody is done.
    rwLock.EnterWriteLock();
    try
    {
        RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

        bool sent = false;
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
        while (deliveryTCS.TryDequeue(out tcs))
        {
            //once we manage to actually set somebody's result, we are done with handling this
            if (tcs.TrySetResult(e))
            {
                sent = true;
                break;
            }
        }

        //if nothing was sent, we queue up what we got so that somebody can get it later.
        /**
         * Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
         * next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
         * doing our thing here.
         */
        if (!sent)
        {
            undelivered.Enqueue(e);
        }
    }
    finally
    {
        rwLock.ExitWriteLock();
    }
}

rwLock is a ReaderWriterLockSlim. The two queues (deliveryTCS and undelivered) are ConcurrentQueues.

The problem:

Every once in a while, the method that awaits the dequeue method throws an exception. This would not normally be an issue since that method is also async and so it enters the "Exception" completion state that tasks enter. The problem comes in the situation where the task that calls DequeueAsync is resumed after the await on the AMQP Connection thread that the RabbitMQ client creates. Normally I have seen tasks resume onto the main thread or one of the worker threads floating around. However, when it resumes onto the AMQP thread and an exception is thrown, everything stalls. The task does not enter its "Exception state" and the AMQP Connection thread is left saying that it is executing the method that had the exception occur.

My main confusion here is why this doesn't work:

var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards

ConsumerTaskState state = new ConsumerTaskState()
{
    Connection = connection,
    CancellationToken = cancellationToken
};

//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);

Here is the RunAsync method, set up for the test:

public async Task RunAsync()
{
    using (var channel = this.Connection.CreateModel())
    {
        ...
        AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
        var result = consumer.DequeueAsync(this.CancellationToken);

        //wait until we find something to eat
        await result;

        throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
        ...
    } //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}

Reading what I have written, I see that I may not have explained my problem very well. If clarification is needed, just ask.

My solutions that I have come up with are as follows:

  • Remove all Async/Await code and just use straight up threads and block. Performance will be decreased, but at least it won't stall sometimes
  • Somehow exempt the AMQP threads from being used for resuming tasks. I assume that they were sleeping or something and then the default TaskScheduler decided to use them. If I could find a way to tell the task scheduler that those threads are off limits, that would be great.

Does anyone have an explanation for why this is happening or any suggestions to solving this? Right now I am removing the async code just so that the program is reliable, but I really want to understand what is going on here.

like image 548
Los Frijoles Avatar asked Oct 30 '13 18:10

Los Frijoles


People also ask

Does Task run create new thread?

Note: Just using a Task in . NET code does not mean there are separate new threads involved. Generally when using Task. Run() or similar constructs, a task runs on a separate thread (mostly a managed thread-pool one), managed by the .

Does Task use thread?

A new Thread()is not dealing with Thread pool thread, whereas Task does use thread pool thread. A Task is a higher level concept than Thread.

Can a thread have multiple tasks?

To perform multiple tasks by multiple threads, we need to use multiple run() methods. In this example; we need to extend the thread to perform multiple tasks. In this example, we use an anonymous class that extends the Thread class. In this example, we use an anonymous class that implements the Runnable interface.

What is the difference between a task and a thread?

A task is something you want done. A thread is one of the many possible workers which performs that task. In . NET 4.0 terms, a Task represents an asynchronous operation.


1 Answers

I first recommend that you read my async intro, which explains in precise terms how await will capture a context and use that to resume execution. In short, it will capture the current SynchronizationContext (or the current TaskScheduler if SynchronizationContext.Current is null).

The other important detail is that async continuations are scheduled with TaskContinuationOptions.ExecuteSynchronously (as @svick pointed out in a comment). I have a blog post about this but AFAIK it is not officially documented anywhere. This detail does make writing an async producer/consumer queue difficult.

The reason await isn't "switching back to the original context" is (probably) because the RabbitMQ threads don't have a SynchronizationContext or TaskScheduler - thus, the continuation is executed directly when you call TrySetResult because those threads look just like regular thread pool threads.

BTW, reading through your code, I suspect your use of a reader/writer lock and concurrent queues are incorrect. I can't be sure without seeing the whole code, but that's my impression.

I strongly recommend you use an existing async queue and build a consumer around that (in other words, let someone else do the hard part :). The BufferBlock<T> type in TPL Dataflow can act as an async queue; that would be my first recommendation if you have Dataflow available on your platform. Otherwise, I have an AsyncProducerConsumerQueue type in my AsyncEx library, or you could write your own (as I describe on my blog).

Here's an example using BufferBlock<T>:

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
    _queue.Post(e);
}

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    return _queue.ReceiveAsync(cancellationToken);
}

In this example, I'm keeping your DequeueAsync API. However, once you start using TPL Dataflow, consider using it elsewhere as well. When you need a queue like this, it's common to find other parts of your code that would also benefit from a dataflow approach. E.g., instead of having a bunch of methods calling DequeueAsync, you could link your BufferBlock to an ActionBlock.

like image 94
Stephen Cleary Avatar answered Oct 16 '22 23:10

Stephen Cleary