I don't understand why, but there appears to be no mechanism in the client library for performing many queries in parallel for Windows Azure Table Storage. I've created a template class that can be used to save considerable time, and you're welcome to use it however you wish. I would appreciate however, if you could pick it apart, and provide feedback on how to improve this class.
public class AsyncDataQuery<T> where T: new()
{
public AsyncDataQuery(bool preserve_order)
{
m_preserve_order = preserve_order;
this.Queries = new List<CloudTableQuery<T>>(1000);
}
public void AddQuery(IQueryable<T> query)
{
var data_query = (DataServiceQuery<T>)query;
var uri = data_query.RequestUri; // required
this.Queries.Add(new CloudTableQuery<T>(data_query));
}
/// <summary>
/// Blocking but still optimized.
/// </summary>
public List<T> Execute()
{
this.BeginAsync();
return this.EndAsync();
}
public void BeginAsync()
{
if (m_preserve_order == true)
{
this.Items = new List<T>(Queries.Count);
for (var i = 0; i < Queries.Count; i++)
{
this.Items.Add(new T());
}
}
else
{
this.Items = new List<T>(Queries.Count * 2);
}
m_wait = new ManualResetEvent(false);
for (var i = 0; i < Queries.Count; i++)
{
var query = Queries[i];
query.BeginExecuteSegmented(callback, i);
}
}
public List<T> EndAsync()
{
m_wait.WaitOne();
m_wait.Dispose();
return this.Items;
}
private List<T> Items { get; set; }
private List<CloudTableQuery<T>> Queries { get; set; }
private bool m_preserve_order;
private ManualResetEvent m_wait;
private int m_completed = 0;
private object m_lock = new object();
private void callback(IAsyncResult ar)
{
int i = (int)ar.AsyncState;
CloudTableQuery<T> query = Queries[i];
var response = query.EndExecuteSegmented(ar);
if (m_preserve_order == true)
{ // preserve ordering only supports one result per query
lock (m_lock)
{
this.Items[i] = response.Results.Single();
}
}
else
{ // add any number of items
lock (m_lock)
{
this.Items.AddRange(response.Results);
}
}
if (response.HasMoreResults == true)
{ // more data to pull
query.BeginExecuteSegmented(response.ContinuationToken, callback, i);
return;
}
m_completed = Interlocked.Increment(ref m_completed);
if (m_completed == Queries.Count)
{
m_wait.Set();
}
}
}
Guess I'm late to the party. I would add two things:
Also, I think this is actually a better approach that the Task Parallel Library. I tried the Task-per-query approach before this. The code was actually more awkward, and it tended to result in having a lot of active threads. I still haven't tested extensively with your code, but it seems to work better on first blush.
I've put some work into a more-or-less rewrite of the code above. My rewrite removes all locking, supports client-side timeouts of hung transactions (rare, but it does happen, and can really ruin your day), and some exception handling logic. There is a full solution with tests up on Bitbucket. The most relevant code lives in one file, though it does require some helpers that are in other parts of the project.
Have you considered using the Task Parallel Library?
http://msdn.microsoft.com/en-us/library/dd537609.aspx
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With