Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Batching on duration or threshold using TPL Dataflow

I have implemented a producer..consumer pattern using TPL Dataflow. The use case is that code reads messages from the Kafka bus. For efficiency, we need to process messages in batches when going to the database.

Is there a way in TPL data flow to hold on to the message and fire whenever a size or duration threshold is hit?

Example, the current implementation post the message once it is pulled from the queue.

    postedSuccessfully = targetBuffer.Post(msg.Value);
like image 643
Ashish Bhatia Avatar asked Oct 03 '18 18:10

Ashish Bhatia


1 Answers

Buffering by count and duration is already available through the System.Reactive and specifically, the Buffer operator. Buffer collects incoming events until either the desired count is reached or its timespan expires.

Dataflow blocks are designed to work with System.Reactive. Blocks can be converted to Observables and Observers by using the DataflowBlock.AsObservable() and AsObserver() extension methods.

This makes building a buffering block very easy :

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}

This method uses two buffer blocks to buffer inputs and outputs. Buffer() reads from the input block (the observable) and writes to the output block (the observer) when either the batch is full or the timespan expires.

By default, Rx works on the current thread. By calling ObserveOn(TaskPoolScheduler.Default) we tell it to process data on a Task pool thread.

Example

This code creates a buffer block for 5 items or 1 second. It starts by posting 7 items, waits 1.1 seconds then posts another 7 items. Each batch is written to the console together with the thread ID :

static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

The output is :

Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
like image 164
Panagiotis Kanavos Avatar answered Oct 06 '22 01:10

Panagiotis Kanavos