Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Azure ServiceBus & async - To be, or not to be?

I'm running Service Bus on Azure, pumping about 10-100 messages per second.

Recently I've switched to .net 4.5 and all excited refactored all the code to have 'async' and 'await' at least twice in each line to make sure it's done 'properly' :)

Now I'm wondering whether it's actually for better or for worse. If you could have a look at the code snippets and let me know what your thoughts are. I especially worried if the thread context switching is not giving me more grief than benefit, from all the asynchrony... (looking at !dumpheap it's definitely a factor)

Just a bit of description - I will be posting 2 methods - one that does a while loop on a ConcurrentQueue, waiting for new messages and the other method that sends one message at a time. I'm also using the Transient Fault Handling block exactly as Dr. Azure prescribed.

Sending loop (started at the beginning, waiting for new messages):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();

            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;

            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }

                while (this.queue.TryDequeue(out message))
                {                       
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

Sending a message:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }

        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;                
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }

        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

The code above is from a 'Sender' class that sends 1 message/second. I have about 50-100 instances running at any given time, so it could be quite a number of threads.

Btw do not worry about EnsureMessageSender, RecreateMessageFactory, EnsureTopicExists too much, they are not called that often.

Would I not be better of just having one background thread working through the message queue and sending messages synchronously, provided all I need is send one message at a time, not worry about the async stuff and avoid the overheads coming with it.

Note that usually it's a matter of milliseconds to send one Message to Azure Service Bus, it's not really expensive. (Except at times when it's slow, times out or there is a problem with Service Bus backend, it could be hanging for a while trying to send stuff).

Thanks and sorry for the long post,

Stevo

Proposed Solution

Would this example be a solution to my situation?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;                       

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 

                    Console.WriteLine("Value sent " + val);                        
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

        }, cancel.Token);

        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }

        cancel.Cancel();
        run.Wait();
    }
like image 514
user1275154 Avatar asked Mar 25 '13 10:03

user1275154


2 Answers

You say:

The code above is from a 'Sender' class that sends 1 message/second. I have about 50-100 instances running at any given time, so it could be quite a number of threads.

This is a good case for async. You save lots of threads here. Async reduces context switching because it is not thread-based. It does not context-switch in case of something requiring a wait. Instead, the next work item is being processed on the same thread (if there is one).

For that reason you async solution will definitely scale better than a synchronous one. Whether it actually uses less CPU at 50-100 instances of your workflow needs to be measured. The more instances there are the higher the probability of async being faster becomes.

Now, there is one problem with the implementation: You're using a ConcurrentQueue which is not async-ready. So you actually do use 50-100 threads even in your async version. They will either block (which you wanted to avoid) or busy-wait burning 100% CPU (which seems to be the case in your implementation!). You need to get rid of this problem and make the queuing async, too. Maybe a SemaphoreSlim is of help here as it can be waited on asynchronously.

like image 166
usr Avatar answered Nov 02 '22 22:11

usr


First, keep in mind that Task != Thread. Tasks (and async method continuations) are scheduled to the thread pool, where Microsoft has put in tons of optimizations that work wonders as long as your tasks are fairly short.

Reviewing your code, one line raises a flag: semaphore.WaitOne. I assume you're using this as a kind of signal that there is data available in the queue. This is bad because it's a blocking wait inside an async method. By using a blocking wait, the code changes from a lightweight continuation into a much heavier thread pool thread.

So, I would follow @usr's recommendation and replace the queue (and the semaphore) with an async-ready queue. TPL Dataflow's BufferBlock<T> is an async-ready producer/consumer queue available via NuGet. I recommend this one first because it sounds like your project could benefit from using dataflow more extensively than just as a queue (but the queue is a fine place to start).

Other async-ready data structures exist; my AsyncEx library has a couple of them. It's also not hard to build a simple one yourself; I have a blog post on the subject. But I recommend TPL Dataflow in your situation.

like image 4
Stephen Cleary Avatar answered Nov 02 '22 23:11

Stephen Cleary