I need to do this kind of work:
Since I need to control how much Pages I process in parallel I've decided to go with TPL Dataflows:
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
____________________________
| Process images |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|____________________________|
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
Now I need the "Process images" to process images in parallel but I want to limit how much images I've processing across all parallel pages in work currently.
I can use TrasnformManyBlock for "Process images" but how do I gather them back in "Save page" block?
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
___________________________________
| Load images |
| TransformManyBlock<Page, Image[]> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|___________________________________|
/ | \
______________________________________________
_|____________________________________________ |
| Process image | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1 | |
| MaxDegreeOfParallelism = 8 |_|
|______________________________________________|
\ | /
How to group images by page ?
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
On top of that potentially one of the images could fail to be proceed and I don't want to save page with failed images.
You can group the images together by recording whenever an image for a given page arrives and then sending the page on when all images arrived. To figure that out, page needs to know how many images it contains, but I assume you know that.
In code, it could look something like this:
public static IPropagatorBlock<TSplit, TMerged>
CreaterMergerBlock<TSplit, TMerged>(
Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
var dictionary = new Dictionary<TMerged, int>();
return new TransformManyBlock<TSplit, TMerged>(
split =>
{
var merged = getMergedFunc(split);
int count;
dictionary.TryGetValue(merged, out count);
count++;
if (getSplitCount(merged) == count)
{
dictionary.Remove(merged);
return new[] { merged };
}
dictionary[merged] = count;
return new TMerged[0];
});
}
Usage:
var dataPipe = new BufferBlock<Page>();
var splitter = new TransformManyBlock<Page, ImageWithPage>(
page => page.LoadImages(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
image =>
{
// process the image here
return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var merger = CreaterMergerBlock(
(ImageWithPage image) => image.Page, page => page.ImageCount);
var savePage = new ActionBlock<Page>(
page => /* save the page here */,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With