I try to enqueue messages in Azure Queues asynchronously like this:
private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
var tasks = messages.Select(msg => _queue.AddMessageAsync(new CloudQueueMessage(msg),
null, null, null, null));
await Task.WhenAll(tasks);
}
If I get it right this says "start enqueuing one item after the other without waiting them to get posted, keep a reference for each task and then wait until all get posted".
This code works fine in most cases, but for a large number of items (5000), it starts enqueuing and then throws a timeout exception (after having enqueued ~3500 items).
I solved it by waiting each one to finish before continuing with the next one
private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
foreach (var message in messages) {
await _queue.AddMessageAsync(new CloudQueueMessage(message), null, null, null, null);
}
}
Can anyone explain why this happened?
Exception:
System.AggregateException
which wraps many such exceptions:Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass4.<CreateCallbackVoid>b__3(IAsyncResult ar)
Request Information RequestID: RequestDate: StatusMessage: <--- ---> (Inner Exception #1) Microsoft.WindowsAzure.Storage.StorageException: The client could not finish the operation within specified timeout. ---> System.TimeoutException: The client could not finish the operation within specified timeout. --- End of inner exception stack trace --- Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult result)`.
Storage queues provide a uniform and consistent programming model across queues, tables, and BLOBs – both for developers and for operations teams. Service Bus queues provide support for local transactions in the context of a single queue.
Queues offer First In, First Out (FIFO) message delivery to one or more competing consumers. That is, receivers typically receive and process messages in the order in which they were added to the queue.
A queue in Azure is designed have a throughput of 2000 messages per second.
See: Azure Storage Scalability and Performance Targets
When your application reaches the limit of what a partition can handle for your workload, Azure Storage will begin to return error code 503 (Server Busy) or error code 500 (Operation Timeout) responses. When this occurs, the application should use an exponential backoff policy for retries. The exponential backoff allows the load on the partition to decrease, and to ease out spikes in traffic to that partition.
It seems that you can make a more robust mechanism by passing a QueryRequestOptions
to AddMessageAsync
.
Before the query is sent, the request message adds these properties to the command.
I would try passing QueryRequestOptions
and setting a value to MaximumExecutionTime
and ServerTimeout
with a larger value.
This is how the request is filled prior to being sent:
// Microsoft.WindowsAzure.Storage.Queue.QueueRequestOptions
internal void ApplyToStorageCommand<T>(RESTCommand<T> cmd)
{
if (this.LocationMode.HasValue)
{
cmd.LocationMode = this.LocationMode.Value;
}
if (this.ServerTimeout.HasValue)
{
cmd.ServerTimeoutInSeconds = new int?((int)this.ServerTimeout.Value.TotalSeconds);
}
if (this.OperationExpiryTime.HasValue)
{
cmd.OperationExpiryTime = this.OperationExpiryTime;
return;
}
if (this.MaximumExecutionTime.HasValue)
{
cmd.OperationExpiryTime = new DateTime?(DateTime.Now + this.MaximumExecutionTime.Value);
}
}
And this is how it's sent:
rESTCommand.PreProcessResponse = delegate(RESTCommand<NullType> cmd, HttpWebResponse resp, Exception ex, OperationContext ctx)
{
HttpResponseParsers.ProcessExpectedStatusCodeNoException<NullType>(HttpStatusCode.Created, resp, NullType.Value, cmd, ex);
return NullType.Value;
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With