Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using AsObservable to observe TPL Dataflow blocks without consuming messages

I have a chain of TPL Dataflow blocks and would like to observe progress somewhere inside the system.

I am aware that I could just jam a TransformBlock into the mesh where I want to observe, get it to post to a progress updater of some variety and then return the message unchanged to the next block. I don't love this solution as the block would be purely there for its side-effect and I would also have to change the block linking logic wherever I want to observe.

So I wondered if I could use ISourceBlock<T>.AsObservable to observe the passing of messages within the mesh without altering it and without consuming the messages. This seems both a purer and more practical solution, if it worked.

From my (limited) understanding of Rx that means that I need the observable to be hot rather than cold, so that my progress updater sees the message but doesn't consume it. And .Publish().RefCount() seems to be the way to make an observable hot. However, it simply does not work as intended - instead either block2 or progress receives and consumes each message.

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});

// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

Result is non-deterministic but I get something mixed like this:

block2:21
progress:22
progress:24
block2:23
progress:25

So, am I doing something wrong, or is this impossible due to the way the way TPL Dataflow AsObservable is implemented?

I realise I could also replace the LinkTo between block1 and block2 with an Observable/Observer pair and that might work, but LinkTo with downstream BoundedCapacity = 1 is the whole reason I'm using TPL Dataflow in the first place.

edit: A few clarifications:

  • I did intend to set BoundedCapacity=1 in block2. While it's unnecessary in this trivial example, the downstream-constrained case is where I find TPL Dataflow really useful.
  • To clarify the solution I rejected in my second paragraph, it would be to add the following block linked in between block1 and block2:

    var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});

  • I would also like to maintain back-pressure so that if a further-upstream block was distributing work to block1 and also other equivalent workers, it wouldn't send work to block1 if that chain was already busy.

like image 471
theStrawMan Avatar asked Jun 16 '17 01:06

theStrawMan


2 Answers

The issue with your code is that you're wiring up two consumers of block1. Dataflow is then just giving a value to which ever consumer is there first.

So you need to broadcast the values from block1 into two other blocks to then be able to consume those independently.

Just a side note, don't do .Publish().RefCount() as it doesn't do what you think. It will effectively make a one run only observable that during that one run will allow multiple observers to connect and see the same values. It has nothing to do with the source of the data nor how the Dataflow blocks interact.

Try this code:

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);

// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

That gives me:

block2:21
block2:22
block2:23
block2:24
block2:25
progress:21
progress:22
progress:23
progress:24
progress:25

Which is what I think you wanted.

Now, just as a further aside, using Rx for this might be a better option all around. It's much more powerful and declarative than any TPL or Dataflow option.

Your code boils down to this:

Observable
    .Range(1, 5)
    .Select(i => i + 20)
    .Do(i => Debug.Print("progress:" + i.ToString()));
    .Subscribe(i => Debug.Print("block2:" + i.ToString()));

That pretty much gives you same result.

like image 195
Enigmativity Avatar answered Nov 15 '22 07:11

Enigmativity


There are two options to consider when creating an observable dataflow block. You can either:

  1. emit a notification every time a message is processed, or
  2. emit a notification when a previously processed message stored in the block's output buffer, is accepted by a linked block.

Both options have pros and cons. The first option provides timely but unordered notifications. The second option provides ordered but delayed notifications, and also must deal with the disposability of the block-to-block linking. What should happen with the observable, when the link between the two blocks is manually disposed before the blocks are completed?

Below is an implementation of the first option, that creates a TransformBlock together with a non-consuming IObservable of this block. There is also an implementation for an ActionBlock equivalent, based on the first implementation (although it could also be implemented independently by copy-pasting and adapting the TransformBlock implementation, since the code is not that much).

public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var semaphore = new SemaphoreSlim(1);
    int startedIndexSeed = 0;
    int completedIndexSeed = 0;

    var notificationsBlock = new BufferBlock<(TInput, TOutput, int, int)>(
        new DataflowBlockOptions() { BoundedCapacity = 100 });

    var transformBlock = new TransformBlock<TInput, TOutput>(async item =>
    {
        var startedIndex = Interlocked.Increment(ref startedIndexSeed);
        var result = await transform(item).ConfigureAwait(false);
        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            // Send the notifications in synchronized fashion
            var completedIndex = Interlocked.Increment(ref completedIndexSeed);
            await notificationsBlock.SendAsync(
                (item, result, startedIndex, completedIndex)).ConfigureAwait(false);
        }
        finally
        {
            semaphore.Release();
        }
        return result;
    }, dataflowBlockOptions);

    _ = transformBlock.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)notificationsBlock).Fault(t.Exception);
        else notificationsBlock.Complete();
    }, TaskScheduler.Default);

    observable = notificationsBlock.AsObservable();
    // A dummy subscription to prevent buffering in case of no external subscription.
    observable.Subscribe(
        DataflowBlock.NullTarget<(TInput, TOutput, int, int)>().AsObserver());
    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableTransformBlock(item => Task.FromResult(transform(item)),
        out observable, dataflowBlockOptions);
}

// ActionBlock equivalent (requires the System.Reactive package)
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Func<TInput, Task> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateObservableTransformBlock<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        out var sourceObservable, dataflowBlockOptions);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    observable = sourceObservable
        .Select(entry => (entry.Input, entry.StartedIndex, entry.CompletedIndex));
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Action<TInput> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableActionBlock(
        item => { action(item); return Task.CompletedTask; },
        out observable, dataflowBlockOptions);
}

Usage example in Windows Forms:

private async void Button1_Click(object sender, EventArgs e)
{
    var block = CreateObservableTransformBlock((int i) => i + 20,
        out var observable,
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

    var vals = Enumerable.Range(1, 20).ToList();
    TextBox1.Clear();
    ProgressBar1.Value = 0;

    observable.ObserveOn(SynchronizationContext.Current).Subscribe(onNext: x =>
    {
        TextBox1.AppendText($"Value {x.Input} transformed to {x.Output}\r\n");
        ProgressBar1.Value = (x.CompletedIndex * 100) / vals.Count;
    }, onError: ex =>
    {
        TextBox1.AppendText($"An exception occured: {ex.Message}\r\n");
    },
    onCompleted: () =>
    {
        TextBox1.AppendText("The job completed successfully\r\n");
    });

    block.LinkTo(DataflowBlock.NullTarget<int>());

    foreach (var i in vals) await block.SendAsync(i);
    block.Complete();
}

In the above example the type of the observable variable is:

IObservable<(int Input, int Output, int StartedIndex, int CompletedIndex)>

The two indices are 1-based.

like image 1
Theodor Zoulias Avatar answered Nov 15 '22 06:11

Theodor Zoulias