Problem: I would like to download 100 files in parallel from AWS S3 using their .NET SDK. The downloaded content should be stored in 100 memory streams (the files are small enough, and I can take it from there). I am geting confused between Task, IAsyncResult, Parallel.*, and other different approaches in .NET 4.0.
If I try to solve the problem myself, off the top of my head I imagine something like this pseudocode: (edited to add types to some variables)
using Amazon;
using Amazon.S3;
using Amazon.S3.Model;
AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;
// Prepare to launch requests
var asyncRequests = from rq in requestObjects
select _s3.BeginGetObject(rq,null,null);
// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();
// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched
select _s3.EndGetRequest(rq);
// Finish requests
var actualResponses = responses.ToList();
// Fetch data
var data = actualResponses.Select(rp => {
var ms = new MemoryStream();
rp.ResponseStream.CopyTo(ms);
return ms;
});
This code launches 100 requests in parallel, which is good. However, there are two problems:
So here I start thinking I am heading down the wrong path...
Help?
It's probably easier if you break the operation down into a method that will handle one request asynchronously and then call it 100 times.
To start, let's identify the final result you want. Since what you'll be working with is a MemoryStream
it means that you'll want to return a Task<MemoryStream>
from your method. The signature will look something like this:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
Because your AmazonS3
object implements the Asynchronous Design Pattern, you can use the FromAsync
method on the TaskFactory
class to generate a Task<T>
from a class that implements the Asynchronous Design Pattern, like so:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null);
// But what goes here?
So you're already in a good place, you have a Task<T>
which you can wait on or get a callback on when the call completes. However, you need to somehow translate the GetObjectResponse
returned from the call to Task<GetObjectResponse>
into a MemoryStream
.
To that end, you want to use the ContinueWith
method on the Task<T>
class. Think of it as the asynchronous version of the Select
method on the Enumerable
class, it's just a projection into another Task<T>
except that each time you call ContinueWith
, you are potentially creating a new Task that runs that section of code.
With that, your method looks like the following:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
// Start the task of downloading.
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null
);
// Translate.
Task<MemoryStream> translation = response.ContinueWith(t => {
using (Task<GetObjectResponse> resp = t ){
var ms = new MemoryStream();
t.Result.ResponseStream.CopyTo(ms);
return ms;
}
});
// Return the full task chain.
return translation;
}
Note that in the above you can possibly call the overload of ContinueWith
passing TaskContinuationOptions.ExecuteSynchronously
, as it appears you are doing minimal work (I can't tell, the responses might be huge). In the cases where you are doing very minimal work where it would be detrimental to start a new task in order to complete the work, you should pass TaskContinuationOptions.ExecuteSynchronously
so that you don't waste time creating new tasks for minimal operations.
Now that you have the method that can translate one request into a Task<MemoryStream>
, creating a wrapper that will process any number of them is simple:
static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
// Just call Select on the requests, passing our translation into
// a Task<MemoryStream>.
// Also, materialize here, so that the tasks are "hot" when
// returned.
return requests.Select(r => GetMemoryStreamAsync(s3, r)).
ToArray();
}
In the above, you simply take a sequence of your GetObjectRequest
instances and it will return an array of Task<MemoryStream>
. The fact that it returns a materialized sequence is important. If you don't materialize it before returning, then the tasks will not be created until the sequence is iterated through.
Of course, if you want this behavior, then by all means, just remove the call to .ToArray()
, have the method return IEnumerable<Task<MemoryStream>>
and then the requests will be made as you iterate through the tasks.
From there, you can process them one at a time (using the Task.WaitAny
method in a loop) or wait for all of them to be completed (by calling the Task.WaitAll
method). An example of the latter would be:
static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
Task.WaitAll(tasks);
return tasks.Select(t => t.Result).ToList();
}
Also, it should be mentioned that this is a pretty good fit for the Reactive Extensions framework, as this very well-suited towards an IObservable<T>
implementation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With