Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

A datablock to join a single result with multiple other results

In my application I want to join multiple strings with a dictionary of replacement values.

The readTemplateBlock gets fed with FileInfos and returns their contents as string.
The getReplacersBlock gets fed (once) with a single replacers dictionary.
The joinTemplateAndReplacersBlock should join each item of the readTemplateBlock with the one getReplacersBlock result.

In my current setup it requires me to post the same replacers dictionary again for each file I post.

// Build
var readTemplateBlock = new TransformBlock<FileInfo, string>(file => File.ReadAllText(file.FullName));
var getReplacersBlock = new WriteOnceBlock<IDictionary<string, string>>(null);
var joinTemplateAndReplacersBlock = new JoinBlock<string, IDictionary<string, string>>();

// Assemble
var propagateComplete = new DataflowLinkOptions {PropagateCompletion = true};

readTemplateBlock.LinkTo(joinTemplateAndReplacersBlock.Target1, propagateComplete);
getReplacersBlock.LinkTo(joinTemplateAndReplacersBlock.Target2, propagateComplete);
joinTemplateAndReplacersBlock.LinkTo(replaceTemplateBlock, propagateComplete);

// Post
foreach (var template in templateFilenames)
{
    getFileBlock.Post(template);
}
getFileBlock.Complete();

getReplacersBlock.Post(replacers);
getReplacersBlock.Complete();

Is there a better block I'm missing? Maybe a configuration option I overlooked?

like image 861
Boris Callens Avatar asked Oct 17 '22 13:10

Boris Callens


1 Answers

I couldn't figure out how to do this using the built-in dataflow blocks. The alternatives I can see:

  1. Use a BufferBlock with small BoundedCapacity along with a Task that keeps sending the value to it. How exactly does the Task get the value could vary, but if you like WriteOnceBlock, you could reuse and encapsulate it:

    static IPropagatorBlock<T, T> CreateWriteOnceRepeaterBlock<T>()
    {
        var target = new WriteOnceBlock<T>(null);
        var source = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = 1 });
    
        Task.Run(
            async () =>
            {
                var value = await target.ReceiveAsync();
    
                while (true)
                {
                    await source.SendAsync(value);
                }
            });
    
        return DataflowBlock.Encapsulate(target, source);
    }
    

    You would then use CreateWriteOnceRepeaterBlock<IDictionary<string, string>>() instead of new WriteOnceBlock<IDictionary<string, string>>(null).

  2. Write a custom block similar to WriteOnceBlock that behaves exactly the way you want. Looking at how big the source of WriteOnceBlock is, this is probably not very appealing.

  3. Use a TaskCompletionSource instead of dataflow blocks for this.

    Assuming your current code looks something like this (using C# 7 and the System.ValueTuple package for brevity):

    void ReplaceTemplateBlockAction(Tuple<string, IDictionary<string, string>> tuple)
    {
        var (template, replacers) = tuple;
        …
    }
    
    …
    
    var getReplacersBlock = new WriteOnceBlock<IDictionary<string, string>>(null);
    var replaceTemplateBlock = new ActionBlock<Tuple<string, IDictionary<string, string>>>(
        ReplaceTemplateBlockAction);
    …
    getReplacersBlock.Post(replacers);
    

    You would instead use:

    void ReplaceTemplateBlockAction(string template, IDictionary<string, string>>> replacers)
    {
        …
    }
    
    …
    
    var getReplacersTcs = new TaskCompletionSource<IDictionary<string, string>>();
    var replaceTemplateBlock = new ActionBlock<string>(
        async template => ReplaceTemplateBlockAction(template, await getReplacersTcs.Task));
    …
    getReplacersTcs.SetResult(replacers);
    
like image 185
svick Avatar answered Oct 21 '22 04:10

svick