I'm trying to use a dataflowblock and I need to spy the items passing through for unit testing.
In order to do this, I'm using the AsObservable() method on ISourceBlock<T> of my TransformBlock<Tinput, T>,
so I can check after execution that each block of my pipeline have generated the expected values.
{
...
var observer = new MyObserver<string>();
_block = new TransformManyBlock<string, string>(MyHandler, options);
_block.LinkTo(_nextBlock);
_block.AsObservable().Subscribe(observer);
_block.Post("Test");
...
}
public class MyObserver<T> : IObserver<T>
{
public List<Exception> Errors = new List<Exception>();
public bool IsComplete = false;
public List<T> Values = new List<T>();
public void OnCompleted()
{
IsComplete = true;
}
public void OnNext(T value)
{
Values.Add(value);
}
public void OnError(Exception e)
{
Errors.Add(e);
}
}
So basically I subscribe my observer to the transformblock, and I expect that each value passing through get registered in my observer "values" list.
But, while the IsComplete is set to true, and the OnError() successfully register exception,
the OnNext() method never get called unless it is the last block of the pipeline...
I can't figure out why, because the "nextblock" linked to this sourceBlock successfully receive the data, proving that some data are exiting the block.
From what I understand, the AsObservable is supposed to report every values exiting the block and not only the values that have not been consumed by other linked blocks...
What am I doing wrong ?
Your messages are being consumed by _nextBlock before you get a chance to read them.
If you comment out this line _block.LinkTo(_nextBlock); it would likely work.
AsObservable sole purpose is just to allow a block to be consumed from RX. It doesn't change the internal working of the block to broadcast messages to multiple targets. You need a special block for that BroadcastBlock
I would suggest broadcasting to another block and using that to Subscribe
BroadcastBlock’s mission in life is to enable all targets linked from the block to get a copy of every element published
var options = new DataflowLinkOptions {PropagateCompletion = true};
var broadcastBlock = new BroadcastBlock<string>(x => x);
var bufferBlock = new BufferBlock<string>();
var actionBlock = new ActionBlock<string>(s => Console.WriteLine("Action " + s));
broadcastBlock.LinkTo(bufferBlock, options);
broadcastBlock.LinkTo(actionBlock, options);
bufferBlock.AsObservable().Subscribe(s => Console.WriteLine("peek " + s));
for (var i = 0; i < 5; i++)
await broadcastBlock.SendAsync(i.ToString());
broadcastBlock.Complete();
await actionBlock.Completion;
Output
peek 0
Action 0
Action 1
Action 2
Action 3
Action 4
peek 1
peek 2
peek 3
peek 4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With