Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to mark a TPL dataflow cycle to complete?

Given the following setup in TPL dataflow.

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    }));

tileFilder.LinkTo(block);


using (new TraceTimer("Time"))
{
    dirBroadcast.Post(directory);

    block.LinkTo(new ActionBlock<FileInfo>((s) =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);

    }));
    block.Complete();
    block.Completion.Wait();

}

i am wondering how I can mark this to complete because of the cycle. A directory is posted to the dirBroadcast broadcaster which posts to the dirfinder that might post back new dirs to the broadcaster, so i cant simply mark it as complete because it would block any directories being added from the dirfinder. Should i redesign it to keep track of the number of dirs or is there anything for this in TPL.

like image 377
Poul K. Sørensen Avatar asked Sep 30 '14 21:09

Poul K. Sørensen


People also ask

Where can I find the source code for TPL Dataflow?

Source code for this post can be found on Github here. TPL Dataflow is a data processing library from Microsoft that came out years ago. It consists of different "blocks" that you compose together to make a pipeline. Blocks correspond to stages in your pipeline.

Can TPL Dataflow accommodate back-pressure and load-shedding?

The producer can either decide to carry on at the same speed but discard some messages or slow down to the speed of Block 1. In this way TPL Dataflow can easily accomodate both back-pressure and load-shedding. However, not all blocks pause when the block it feeds becomes full.

How to create custom block types in TPL Dataflow library?

Although the TPL Dataflow Library provides many predefined block types, you can create additional block types that perform custom behavior. Implement the ISourceBlock<TOutput> or ITargetBlock<TInput> interfaces directly or use the Encapsulate method to build a complex block that encapsulates the behavior of existing block types.

How do I implement a job queue in TPL Dataflow?

To start with TPL Dataflow, you will need to add the System.Threading.Tasks.Dataflow NuGet. Once done, here is our first Job Queue implementation: As you can see, this is as simple as it gets. In fact, the little wrapper class can be removed entirely. A single ActionBlock naturally acts like a full-blown Job Queue with a single dedicated thread.


1 Answers

If the purpose of your code is to traverse the directory structure using some sort of parallelism then I would suggest not using TPL Dataflow and use Microsoft's Reactive Framework instead. I think it becomes much simpler.

Here's how I would do it.

First define a recursive function to build the list of directories:

Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = di =>
    Observable
        .Return(di)
        .Concat(di.GetDirectories()
            .ToObservable()
            .SelectMany(di2 => recurse(di2)))
        .ObserveOn(Scheduler.Default);

This performs the recurse of the directories and uses the default Rx scheduler which causes the observable to run in parallel.

So by calling recurse with an input DirectoryInfo I get an observable list of the input directory and all of its descendants.

Now I can build a fairly straight-forward query to get the results I want:

var query =
    from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"))
    from fi in di.GetFiles().ToObservable()
    let zxy =
        fi
            .FullName
            .Split('\\')
            .Reverse()
            .Take(3)
            .Reverse()
            .Select(s => int.Parse(Path.GetFileNameWithoutExtension(s)))
            .ToArray()
    let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2])
    select new FileInfo(Path.Combine(di.FullName, suffix));

Now I can action the query like this:

query
    .Subscribe(s =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);
    });

Now I may have missed a little bit in your custom code but if this is an approach you want to take I'm sure you can fix any logical issues quite easily.

This code automatically handles completion when it runs out of child directories and files.

To add Rx to your project look for "Rx-Main" in NuGet.

like image 122
Enigmativity Avatar answered Sep 22 '22 10:09

Enigmativity