Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a parallel prefetch for a foreach

Given the numerous new ways of performing asynchronous operations in C#, TPL, Parallel Extensions, Async CTP, Reactive Extensions I was wonder what the simplest way to parallelize the fetching and processing portions of the following would be:

foreach(string url in urls)
{
   var file = FetchFile(url);
   ProcessFile(file);
}

The proviso is that whilst files can be fetched at anytime ProcessFile can only handle one file at a time and should be called sequentially.

In short what is the simplest way to get FetchFile and ProcessFile to behave in a pipelined way i.e. happen concurrently?

like image 918
chillitom Avatar asked Dec 19 '25 13:12

chillitom


2 Answers

Here's RX way. This extension will transform a steam of uri's into a stream of streams:

    public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
    TimeSpan timeout)
    {
        return
            from wc in source.Select(WebRequest.Create)
            from s in Observable
                .FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
                    wc.EndGetResponse)()
                .Timeout(timeout, Observable.Empty<WebResponse>())
                .Catch(Observable.Empty<WebResponse>())
            select s.GetResponseStream();
    }

Usage:

new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
   .ToObservable()
   .RequestToStream(TimeSpan.FromSeconds(5))
   .Do(stream = > ProcessStream(stream))
   .Subscribe();

Edit: oops, haven't noticed the file write serialization requirement. This part can be done by employing .Concat which is essentially an RX queue (another one is .Zip)

Let's have a .StreamToFile extension:

    public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
    {
        return Observable.Defer(() =>
            source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
    }

now you can have web requests parallel but serialize file writing that comes from them:

        new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
            .ToObservable()
            .RequestToStream(TimeSpan.FromSeconds(5))
            .Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
            .Select(x => x.StreamToFile())
            .Concat()
            .Subscribe();
like image 68
Sergey Aldoukhov Avatar answered Dec 21 '25 04:12

Sergey Aldoukhov


Given the constraint on ProcessFile I would say you should fetch the data asynchronously using TPL and then enqueue a token which references the preloaded data. You can then have a background thread that pulls items off the queue and hands them to the ProcessFile one by one. This is a producer/consumer pattern.

For the queue you can take a look at BlockingCollection which can provide a threadsafe queue which also has the nice effect of being able to throttle the workload.

like image 31
Chris Taylor Avatar answered Dec 21 '25 02:12

Chris Taylor



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!