I have implemented a producer..consumer pattern using TPL Dataflow. The use case is that code reads messages from the Kafka bus. For efficiency, we need to process messages in batches when going to the database.
Is there a way in TPL data flow to hold on to the message and fire whenever a size or duration threshold is hit?
Example, the current implementation post the message once it is pulled from the queue.
postedSuccessfully = targetBuffer.Post(msg.Value);
Buffering by count and duration is already available through the System.Reactive and specifically, the Buffer operator. Buffer collects incoming events until either the desired count is reached or its timespan expires.
Dataflow blocks are designed to work with System.Reactive. Blocks can be converted to Observables and Observers by using the DataflowBlock.AsObservable() and AsObserver() extension methods.
This makes building a buffering block very easy :
public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
var inBlock = new BufferBlock<TIn>();
var outBlock = new BufferBlock<IList<TIn>>();
var outObserver=outBlock.AsObserver();
inBlock.AsObservable()
.Buffer(timeSpan, count)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(outObserver);
return DataflowBlock.Encapsulate(inBlock, outBlock);
}
This method uses two buffer blocks to buffer inputs and outputs. Buffer()
reads from the input block (the observable) and writes to the output block (the observer) when either the batch is full or the timespan expires.
By default, Rx works on the current thread. By calling ObserveOn(TaskPoolScheduler.Default)
we tell it to process data on a Task pool thread.
Example
This code creates a buffer block for 5 items or 1 second. It starts by posting 7 items, waits 1.1 seconds then posts another 7 items. Each batch is written to the console together with the thread ID :
static async Task Main(string[] args)
{
//Build the pipeline
var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
bufferBlock.LinkTo(printBlock, options);
//Start the messages
Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");
for (int i=0;i<7;i++)
{
bufferBlock.Post(i.ToString());
}
await Task.Delay(1100);
for (int i=7; i < 14; i++)
{
bufferBlock.Post(i.ToString());
}
bufferBlock.Complete();
Console.WriteLine($"Finishing");
await bufferBlock.Completion;
Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
Console.ReadKey();
}
static void printOut(IEnumerable<string> items)
{
var line = String.Join(",", items);
Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}
The output is :
Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With