Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Task Parallel Library WaitAny design

I've just begun to explore the TPL and have a design question.

My Scenario: I have a list of URLs that each refer to an image. I want each image to be downloaded in parallel. As soon as at least one image is downloaded, I want to execute a method that does something with the downloaded image. That method should NOT be parallelized -- it should be serial.

I think the following will work but I'm not sure if this is the right way to do it. Because I have separate classes for collecting the images and for doing "something" with the collected images, I end up passing around an array of Tasks which seems wrong since it exposes the inner workings of how images are retrieved. But I don't know a way around it. In reality there is more to both of these methods but that's not important for this. Just know that they really shouldn't be lumped into one large method that both retrieves and does something with the image.

//From the Director class
Task<Image>[] downloadTasks = collector.RetrieveImages(listOfURLs);

for (int i = 0; i < listOfURLs.Count; i++)
{
    //Wait for any of the remaining downloads to complete
    int completedIndex = Task<Image>.WaitAny(downloadTasks);
    Image completedImage = downloadTasks[completedIndex].Result;

    //Now do something with the image (this "something" must happen serially)
    //Uses the "Formatter" class to accomplish this let's say
}

///////////////////////////////////////////////////

//From the Collector class
public Task<Image>[] RetrieveImages(List<string> urls)
{
    Task<Image>[] tasks = new Task<Image>[urls.Count];

    int index = 0;
    foreach (string url in urls)
    {
        string lambdaVar = url;  //Required... Bleh
        tasks[index] = Task<Image>.Factory.StartNew(() =>
            {
                using (WebClient client = new WebClient())
                {
                    //TODO: Replace with live image locations
                    string fileName = String.Format("{0}.png", i);
                    client.DownloadFile(lambdaVar, Path.Combine(
                        Application.StartupPath, fileName));
                }

                return Image.FromFile(Path.Combine(Application.StartupPath, fileName));
            },
            TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent);

        index++;
    }

    return tasks;
}
like image 878
colithium Avatar asked Apr 10 '26 20:04

colithium


2 Answers

Typically you use WaitAny to wait for one task when you don't care about the results of any of the others. For example if you just cared about the first image that happened to get returned.

How about this instead.

This creates two tasks, one which loads images and adds them to a blocking collection. The second task waits on the collection and processes any images added to the queue. When all the images are loaded the first task closes the queue down so the second task can shut down.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Net;
using System.Threading.Tasks;

namespace ClassLibrary1
{
    public class Class1
    {
        readonly string _path = Directory.GetCurrentDirectory();

        public void Demo()
        {
            IList<string> listOfUrls = new List<string>();
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");

            BlockingCollection<Image> images = new BlockingCollection<Image>();

            Parallel.Invoke(
                () =>                   // Task 1: load the images
                {
                    Parallel.For(0, listOfUrls.Count, (i) =>
                        {
                            Image img = RetrieveImages(listOfUrls[i], i);
                            img.Tag = i;
                            images.Add(img);    // Add each image to the queue
                        });
                    images.CompleteAdding();    // Done with images.
                },
                () =>                   // Task 2: Process images serially
                {
                    foreach (var img in images.GetConsumingEnumerable())
                    {
                        string newPath = Path.Combine(_path, String.Format("{0}_rot.png", img.Tag));
                        Console.WriteLine("Rotating image {0}", img.Tag);
                        img.RotateFlip(RotateFlipType.RotateNoneFlipXY);

                        img.Save(newPath);
                    }
                });
        }

        public Image RetrieveImages(string url, int i)
        {
            using (WebClient client = new WebClient())
            {
                string fileName = Path.Combine(_path, String.Format("{0}.png", i));
                Console.WriteLine("Downloading {0}...", url);
                client.DownloadFile(url, Path.Combine(_path, fileName));
                Console.WriteLine("Saving {0} as {1}.", url, fileName);
                return Image.FromFile(Path.Combine(_path, fileName));
            }
        } 
    }
}

WARNING: The code doesn't have any error checking or cancelation. It's late and you need something to do right? :)

This is an example of the pipeline pattern. It assumes that getting an image is pretty slow and that the cost of locking inside the blocking collection isn't going to cause a problem because it happens relatively infrequently compared to the time spent downloading images.

Our book... You can read more about this and other patterns for parallel programming at http://parallelpatterns.codeplex.com/ Chapter 7 covers pipelines and the accompanying examples show pipelines with error handling and cancellation.

like image 138
Ade Miller Avatar answered Apr 13 '26 08:04

Ade Miller


TPL already provides the ContinueWith function to execute one task when another finishes. Task chaining is one of the main patterns used in TPL for asynchronous operations.

The following method downloads a set of images and continues by renaming each of the files

static void DownloadInParallel(string[] urls)
{
   var tempFolder = Path.GetTempPath();

   var downloads = from url in urls
                   select Task.Factory.StartNew<string>(() =>{
                       using (var client = new WebClient())
                       {
                           var uri = new Uri(url);
                           string file = Path.Combine(tempFolder,uri.Segments.Last());
                           client.DownloadFile(uri, file);
                           return file;
                       }
                   },TaskCreationOptions.LongRunning|TaskCreationOptions.AttachedToParent)
                  .ContinueWith(t=>{
                       var filePath = t.Result;
                       File.Move(filePath, filePath + ".test");
                  },TaskContinuationOptions.ExecuteSynchronously);

    var results = downloads.ToArray();
    Task.WaitAll(results);
}

You should also check the WebClient Async Tasks from the ParallelExtensionsExtras samples. The DownloadXXXTask extension methods handle both the creation of tasks and the asynchronous downloading of files.

The following method uses the DownloadDataTask extension to get the image's data and rotate it before saving it to disk

static void DownloadInParallel2(string[] urls)
{
    var tempFolder = Path.GetTempPath();

    var downloads = from url in urls
         let uri=new Uri(url)
         let filePath=Path.Combine(tempFolder,uri.Segments.Last())
         select new WebClient().DownloadDataTask(uri)                                                        
         .ContinueWith(t=>{
            var img = Image.FromStream(new MemoryStream(t.Result));
            img.RotateFlip(RotateFlipType.RotateNoneFlipY);
            img.Save(filePath);
         },TaskContinuationOptions.ExecuteSynchronously);
        
    var results = downloads.ToArray();
    Task.WaitAll(results);
}
like image 45
Panagiotis Kanavos Avatar answered Apr 13 '26 09:04

Panagiotis Kanavos



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!