Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow, can I query whether a data block is marked complete but has not yet completed?

Given the following:

BufferBlock<int> sourceBlock = new BufferBlock<int>();
TransformBlock<int, int> targetBlock = new TransformBlock<int, int>(element =>
{
    return element * 2;
});

sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

//feed some elements into the buffer block
for(int i = 1; i <= 1000000; i++)
{
    sourceBlock.SendAsync(i);
}

sourceBlock.Complete();

targetBlock.Completion.ContinueWith(_ =>
{
    //notify completion of the target block
});

The targetBlock never seems to complete and I think the reason is that all the items in the TransformBlock targetBlock are waiting in the output queue as I have not linked the targetBlock to any other Dataflow block. However, what I actually want to achieve is a notification when (A) the targetBlock is notified of completion AND (B) the input queue is empty. I do not want to care whether items still sit in the output queue of the TransformBlock. How can I go about that? Is the only way to get what I want to query the completion status of the sourceBlock AND to make sure the InputCount of the targetBlock is zero? I am not sure this is very stable (is the sourceBlock truly only marked completed if the last item in the sourceBlock has been passed to the targetBlock?). Is there a more elegant and more efficient way to get to the same goal?

Edit: I just noticed even the "dirty" way to check on completion of the sourceBlock AND InputCount of the targetBlock being zero is not trivial to implement. Where would that block sit? It cannot be within the targetBlock because once above two conditions are met obviously no message is processed within targetBlock anymore. Also checking on the completion status of the sourceBlock introduces a lot of inefficiency.

like image 452
Matt Avatar asked Nov 03 '22 10:11

Matt


1 Answers

I believe you can't directly do this. It's possible you could get this information from some private fields using reflection, but I wouldn't recommend doing that.

But you can do this by creating custom blocks. In the case of Complete() it's simple: just create a block that forwards each method to the original block. Except Complete(), where it will also log it.

In the case of figuring out when processing of all items is complete, you could link your block to an intermediate BufferBlock. This way, the output queue will be emptied quickly and so checking Completed of the internal block would give you fairly accurate measurement of when the processing is complete. This would affect your measurements, but hopefully not significantly.

Another option would be to add some logging at the end of the block's delegate. This way, you could see when processing of the last item was finished.

like image 59
svick Avatar answered Nov 08 '22 06:11

svick