Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow how to synchronize with "global" data

I'm actually learning about TPL Dataflows. Whenever I read something about it, I think ok sounds great, but then I often ask myself: "Ok, and what if I have kind of a manager, handling different sessions. These sessions can be updated by certain messages. In case of a complex TPL dataflow mesh I've to build-in synchronization mechanism for accessing the manager, which will slow down or block the mesh."

Using a management object for TPL dataflows feels kind of wrong.

Can anybody give me some hints (links, books, examples ...) to the "right" direction, how to solve above example.

like image 447
Moerwald Avatar asked Feb 01 '26 21:02

Moerwald


1 Answers

Kind of very broad question without some code. But usually you either passing some "state" objects with Tuples or some DTO, which I personally consider as leaking design, or you can inject some blocks into your pipeline.

For example, you can create a WriteOnceBlock with providing it a session value, unique for each pipeline, and simply notifying it's value with new events happened during execution. This may be an option for you if you create a pipeline for different session, but if you have one big pipeline, you need another way.

For example, you have a BufferBlock, an ActionBlock, which performs a session update, and an TransformBlock, which simply continues your normal pipeline execution. The thing you can do in this case is to introduce a BroadcastBlock, link it with action block and transform block. Now you link your buffer to broadcast block with a predicate, and after that you link buffer to transform block directly.

The idea here is that if your message is filtered (so the session update needed), it will go to broadcast block, and, after that, to session update action and to your normal execution pipeline. If it does not match the predicate, it simply go over your normal workflow.

PS: if this is too complicated in words, I can provide some sample code for such scenario.

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

var buffer = new BufferBlock<int>();
// broadcast do copy according lambda from constructor
// if you provide same reference for message, make sure that your access is thread-safe
var broadcast = new BroadcastBlock<int>(i => i);
buffer.LinkTo(broadcast, linkOptions);

// session update block
var sessionUpdate = new ActionBlock<int>(i =>
    {
        Thread.Sleep(new Random().Next(1000));
        Console.WriteLine($"Session update:{i}");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// normal execution
var transform = new TransformBlock<int, int>(i =>
    {
        Thread.Sleep(new Random().Next(1000));
        Console.WriteLine($"Normal execution:{i}");
        return i;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// do not complete the standalone session block
// message will be accepted only for multipliers of 3
broadcast.LinkTo(sessionUpdate, i => i % 3 == 0);
// normal pipeline with completion propagation
broadcast.LinkTo(transform, linkOptions);

for (var i = 0; i < 10; ++i)
{
    // async message
    await buffer.SendAsync(i);
}
buffer.Complete();
await transform.Completion;
like image 185
VMAtm Avatar answered Feb 03 '26 11:02

VMAtm