Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel batch file download from Amazon S3 using AWS S3 SDK for .NET

Problem: I would like to download 100 files in parallel from AWS S3 using their .NET SDK. The downloaded content should be stored in 100 memory streams (the files are small enough, and I can take it from there). I am geting confused between Task, IAsyncResult, Parallel.*, and other different approaches in .NET 4.0.

If I try to solve the problem myself, off the top of my head I imagine something like this pseudocode: (edited to add types to some variables)

using Amazon;
using Amazon.S3;
using Amazon.S3.Model;

AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;


// Prepare to launch requests
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null);

// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();

// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq);

// Finish requests
var actualResponses = responses.ToList();

// Fetch data
var data = actualResponses.Select(rp => {
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms;
});

This code launches 100 requests in parallel, which is good. However, there are two problems:

  1. The last statement will download files serially, not in parallel. There doesn't seem to be BeginCopyTo()/EndCopyTo() method on stream...
  2. The preceding statement will not let go until all requests have responded. In other words none of the files will start downloading until all of them start.

So here I start thinking I am heading down the wrong path...

Help?

like image 825
DenNukem Avatar asked May 07 '12 18:05

DenNukem


1 Answers

It's probably easier if you break the operation down into a method that will handle one request asynchronously and then call it 100 times.

To start, let's identify the final result you want. Since what you'll be working with is a MemoryStream it means that you'll want to return a Task<MemoryStream> from your method. The signature will look something like this:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)

Because your AmazonS3 object implements the Asynchronous Design Pattern, you can use the FromAsync method on the TaskFactory class to generate a Task<T> from a class that implements the Asynchronous Design Pattern, like so:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null);

    // But what goes here?

So you're already in a good place, you have a Task<T> which you can wait on or get a callback on when the call completes. However, you need to somehow translate the GetObjectResponse returned from the call to Task<GetObjectResponse> into a MemoryStream.

To that end, you want to use the ContinueWith method on the Task<T> class. Think of it as the asynchronous version of the Select method on the Enumerable class, it's just a projection into another Task<T> except that each time you call ContinueWith, you are potentially creating a new Task that runs that section of code.

With that, your method looks like the following:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    // Start the task of downloading.
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null
        );

    // Translate.
    Task<MemoryStream> translation = response.ContinueWith(t => {
        using (Task<GetObjectResponse> resp = t ){
            var ms = new MemoryStream(); 
            t.Result.ResponseStream.CopyTo(ms); 
            return ms;
        } 
    });

    // Return the full task chain.
    return translation;
}

Note that in the above you can possibly call the overload of ContinueWith passing TaskContinuationOptions.ExecuteSynchronously, as it appears you are doing minimal work (I can't tell, the responses might be huge). In the cases where you are doing very minimal work where it would be detrimental to start a new task in order to complete the work, you should pass TaskContinuationOptions.ExecuteSynchronously so that you don't waste time creating new tasks for minimal operations.

Now that you have the method that can translate one request into a Task<MemoryStream>, creating a wrapper that will process any number of them is simple:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
    IEnumerable<GetObjectRequest> requests)
{
    // Just call Select on the requests, passing our translation into
    // a Task<MemoryStream>.
    // Also, materialize here, so that the tasks are "hot" when
    // returned.
    return requests.Select(r => GetMemoryStreamAsync(s3, r)).
        ToArray();
}

In the above, you simply take a sequence of your GetObjectRequest instances and it will return an array of Task<MemoryStream>. The fact that it returns a materialized sequence is important. If you don't materialize it before returning, then the tasks will not be created until the sequence is iterated through.

Of course, if you want this behavior, then by all means, just remove the call to .ToArray(), have the method return IEnumerable<Task<MemoryStream>> and then the requests will be made as you iterate through the tasks.

From there, you can process them one at a time (using the Task.WaitAny method in a loop) or wait for all of them to be completed (by calling the Task.WaitAll method). An example of the latter would be:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests)
{
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
    Task.WaitAll(tasks);
    return tasks.Select(t => t.Result).ToList();
}

Also, it should be mentioned that this is a pretty good fit for the Reactive Extensions framework, as this very well-suited towards an IObservable<T> implementation.

like image 64
casperOne Avatar answered Sep 28 '22 15:09

casperOne