Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BatchBlock produces batch with elements sent after TriggerBatch()

I have a Dataflow pipeline consisting of several blocks. When elements are flowing through my processing pipeline, I want to group them by field A. To do this I have a BatchBlock with high BoundedCapacity. In it I store my elements until I decide that they should be released. So I invoke TriggerBatch() method.

private void Forward(TStronglyTyped data)
{
    if (ShouldCreateNewGroup(data))
    {
        GroupingBlock.TriggerBatch();
    }

 GroupingBlock.SendAsync(data).Wait(SendTimeout);
}

This is how it looks. The problem is, that the batch produced, sometimes contains the next posted element, which shouldn't be there.

To illustrate:

BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);

In this point I expect my batch to be {A,A,A}, but it is {A,A,A,B}

Like TriggerBatch() was asynchronous, and SendAsync was in fact executed before the batch was actually made.

How can I solve this? I obviously don't want to put Task.Wait(x) in there (I tried, and it works, but then performance is poor, of course).

like image 373
wojciech_rak Avatar asked Oct 22 '25 15:10

wojciech_rak


1 Answers

I also encountered this issue by trying to call TriggerBatch in the wrong place. As mentioned, the SlidingWindow example using DataflowBlock.Encapsulate is the answer here, but it took some time to adapt so I thought I'd share my completed block.

My ConditionalBatchBlock creates batches up to a maximum size, possibly sooner if a certain condition is met. In my specific scenario I needed to create batches of 100, but always create new batches when certain changes in the data were detected.

public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
    var queue = new Queue<T>();

    var source = new BufferBlock<T[]>();

    var target = new ActionBlock<T>(async item =>
    {
        // start a new batch if required by the condition
        if (condition(queue, item))
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }

        queue.Enqueue(item);

        // always send a batch when the max size has been reached
        if (queue.Count == batchSize)
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }
    });

    // send any remaining items
    target.Completion.ContinueWith(async t =>
    {
        if (queue.Any())
            await source.SendAsync(queue.ToArray());

        source.Complete();
    });

    return DataflowBlock.Encapsulate(target, source);
}

The condition parameter may be simpler in your case. I needed to look at the queue as well as the current item to make the determination whether to create a new batch.

I used it like this:

public async Task RunExampleAsync<T>()
{
    var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));

    var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));

    conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await ReadDataAsync<T>(conditionalBatchBlock);

    await actionBlock.Completion;
}
like image 150
Loren Paulsen Avatar answered Oct 25 '25 05:10

Loren Paulsen



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!