Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronously posting to Azure Queues

I try to enqueue messages in Azure Queues asynchronously like this:

private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
            var tasks = messages.Select(msg => _queue.AddMessageAsync(new CloudQueueMessage(msg),
                null, null, null, null));

            await Task.WhenAll(tasks);
        }

If I get it right this says "start enqueuing one item after the other without waiting them to get posted, keep a reference for each task and then wait until all get posted".

This code works fine in most cases, but for a large number of items (5000), it starts enqueuing and then throws a timeout exception (after having enqueued ~3500 items).

I solved it by waiting each one to finish before continuing with the next one

private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
            foreach (var message in messages) {
                await _queue.AddMessageAsync(new CloudQueueMessage(message), null, null, null, null);
            }
        }

Can anyone explain why this happened?

Exception:

System.AggregateException which wraps many such exceptions: Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass4.<CreateCallbackVoid>b__3(IAsyncResult ar) Request Information RequestID: RequestDate: StatusMessage: <--- ---> (Inner Exception #1) Microsoft.WindowsAzure.Storage.StorageException: The client could not finish the operation within specified timeout. ---> System.TimeoutException: The client could not finish the operation within specified timeout. --- End of inner exception stack trace --- Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult result)`.

like image 959
zafeiris.m Avatar asked Feb 19 '15 14:02

zafeiris.m


People also ask

What is the difference between Azure storage queue and Service Bus queue?

Storage queues provide a uniform and consistent programming model across queues, tables, and BLOBs – both for developers and for operations teams. Service Bus queues provide support for local transactions in the context of a single queue.

Is Azure queue FIFO?

Queues offer First In, First Out (FIFO) message delivery to one or more competing consumers. That is, receivers typically receive and process messages in the order in which they were added to the queue.


2 Answers

A queue in Azure is designed have a throughput of 2000 messages per second.

See: Azure Storage Scalability and Performance Targets

When your application reaches the limit of what a partition can handle for your workload, Azure Storage will begin to return error code 503 (Server Busy) or error code 500 (Operation Timeout) responses. When this occurs, the application should use an exponential backoff policy for retries. The exponential backoff allows the load on the partition to decrease, and to ease out spikes in traffic to that partition.

like image 91
Mårten Wikström Avatar answered Sep 28 '22 06:09

Mårten Wikström


It seems that you can make a more robust mechanism by passing a QueryRequestOptions to AddMessageAsync.

Before the query is sent, the request message adds these properties to the command.

I would try passing QueryRequestOptions and setting a value to MaximumExecutionTime and ServerTimeout with a larger value.

This is how the request is filled prior to being sent:

// Microsoft.WindowsAzure.Storage.Queue.QueueRequestOptions
internal void ApplyToStorageCommand<T>(RESTCommand<T> cmd)
{
    if (this.LocationMode.HasValue)
    {
        cmd.LocationMode = this.LocationMode.Value;
    }
    if (this.ServerTimeout.HasValue)
    {
        cmd.ServerTimeoutInSeconds = new int?((int)this.ServerTimeout.Value.TotalSeconds);
    }
    if (this.OperationExpiryTime.HasValue)
    {
        cmd.OperationExpiryTime = this.OperationExpiryTime;
        return;
    }
    if (this.MaximumExecutionTime.HasValue)
    {
        cmd.OperationExpiryTime = new DateTime?(DateTime.Now + this.MaximumExecutionTime.Value);
    }
}

And this is how it's sent:

rESTCommand.PreProcessResponse = delegate(RESTCommand<NullType> cmd, HttpWebResponse resp, Exception ex, OperationContext ctx)
{
    HttpResponseParsers.ProcessExpectedStatusCodeNoException<NullType>(HttpStatusCode.Created, resp, NullType.Value, cmd, ex);
    return NullType.Value;
};
like image 37
Yuval Itzchakov Avatar answered Sep 28 '22 07:09

Yuval Itzchakov