Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dataflow with splitting work to small jobs and then group again

I need to do this kind of work:

  1. Get Page object from database
  2. For each page get all images and process them (IO bound, for example, upload to CDN)
  3. If all images proceeded successfully then mark Page as processed in database

Since I need to control how much Pages I process in parallel I've decided to go with TPL Dataflows:

 ____________________________
|         Data pipe          |
|   BufferBlock<Page>        |
|   BoundedCapacity = 1      |
|____________________________|
              |
 ____________________________
|       Process images       |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 8 |
|____________________________|
              |
 ____________________________
|        Save page           |
| ActionBlock<Page>          |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 5 |
|____________________________|

Now I need the "Process images" to process images in parallel but I want to limit how much images I've processing across all parallel pages in work currently.

I can use TrasnformManyBlock for "Process images" but how do I gather them back in "Save page" block?

         ____________________________
        |         Data pipe          |
        |   BufferBlock<Page>        |
        |   BoundedCapacity = 1      |
        |____________________________|
                      |
     ___________________________________
    |           Load images             |
    | TransformManyBlock<Page, Image[]> |
    | BoundedCapacity = 1               |
    | MaxDegreeOfParallelism = 8        |
    |___________________________________|
      /              |              \
   ______________________________________________
 _|____________________________________________  |
|              Process image                   | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1                          | |
| MaxDegreeOfParallelism = 8                   |_|
|______________________________________________|
      \              |               /
         How to group images by page ?
                     |
        ____________________________
       |        Save page           |
       | ActionBlock<Page>          |
       | BoundedCapacity = 1        |
       | MaxDegreeOfParallelism = 5 |
       |____________________________|

On top of that potentially one of the images could fail to be proceed and I don't want to save page with failed images.

like image 703
Michael Logutov Avatar asked Oct 25 '14 07:10

Michael Logutov


1 Answers

You can group the images together by recording whenever an image for a given page arrives and then sending the page on when all images arrived. To figure that out, page needs to know how many images it contains, but I assume you know that.

In code, it could look something like this:

public static IPropagatorBlock<TSplit, TMerged>
    CreaterMergerBlock<TSplit, TMerged>(
    Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
    var dictionary = new Dictionary<TMerged, int>();

    return new TransformManyBlock<TSplit, TMerged>(
        split =>
        {
            var merged = getMergedFunc(split);
            int count;
            dictionary.TryGetValue(merged, out count);
            count++;
            if (getSplitCount(merged) == count)
            {
                dictionary.Remove(merged);
                return new[] { merged };
            }

            dictionary[merged] = count;
            return new TMerged[0];
        });
}

Usage:

var dataPipe = new BufferBlock<Page>();

var splitter = new TransformManyBlock<Page, ImageWithPage>(
    page => page.LoadImages(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
    image =>
    {
        // process the image here
        return image;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var merger = CreaterMergerBlock(
    (ImageWithPage image) => image.Page, page => page.ImageCount);

var savePage = new ActionBlock<Page>(
    page => /* save the page here */,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);
like image 126
svick Avatar answered Nov 12 '22 21:11

svick