In my WPF application using .Net 4.6 I have an event which fires new data points at a high rate (several hundred per second), but not all the time. This data is displayed in a chart.
I would like to update the chart every 50 ms and not after each new data point.
To achieve that I though of using Buffer(TimeSpan.FromMilliseconds(50))
from Rx, which in theory works fine. BUT my subscriber is also called every 50 ms if no new data points are created which is not exactly what I want.
I created a little sample application to test that out:
using System;
using System.Reactive.Linq;
namespace RxTester
{
public class Program
{
private static event EventHandler TheEvent;
static void Main(string[] args)
{
var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h);
var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000))
.Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received..."));
var random = new Random();
var timer = new System.Timers.Timer(2000)
{
AutoReset = true,
Enabled = true
};
timer.Elapsed += (s, e) =>
{
var amount = random.Next(1, 10);
for (int i = 0; i < amount; ++i)
TheEvent?.Invoke(null, null);
};
Console.ReadLine();
timer.Enabled = false;
subscriber.Dispose();
}
}
}
You need to add the "Rx-Linq" NuGet package for it to run or use the following Fiddle: https://dotnetfiddle.net/TV5tD4
There you see several "0 elements received" which is what I would like to avoid. I know I could simple check for e.Count == 0
, but as I use multiple of such buffers this does not seem optimal to me.
Is there a way to only create new buffered blocks of element if elements are available?
I am also open for other approaches to solve my problem of batching events on a time basis - I already looked into TPL Dataflows BatchBlock
, but that seems to only support count based block sizes.
Once again we can use the powerful GroupByUntil
method to create this extension
public static IObservable<IList<TSource>> BufferWhenAvailable<TSource>
(this IObservable<TSource> source,
TimeSpan threshold)
{
return source.Publish( sp =>
sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold))
.SelectMany(i => i.ToList()));
}
The standard way of doing this is simply
.Buffer(period)
.Where(buffer=>buffer.Any())
So effectively doing what you want to avoid (count==0)
. However, this check is very cheap and I would imagine if far cheaper than the other cost involved i.e. Scheduling. The only concern might be the amount allocations that are happening (every 50ms creating a List<T>
) and then the impending GC Gen0 pressure that may build.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With