Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx Buffer without empty calls to subscriber

In my WPF application using .Net 4.6 I have an event which fires new data points at a high rate (several hundred per second), but not all the time. This data is displayed in a chart.

I would like to update the chart every 50 ms and not after each new data point.
To achieve that I though of using Buffer(TimeSpan.FromMilliseconds(50)) from Rx, which in theory works fine. BUT my subscriber is also called every 50 ms if no new data points are created which is not exactly what I want.

I created a little sample application to test that out:

using System;
using System.Reactive.Linq;

namespace RxTester
{
    public class Program
    {
        private static event EventHandler TheEvent;

        static void Main(string[] args)
        {
            var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h);
            var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000))
                .Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received..."));

            var random = new Random();
            var timer = new System.Timers.Timer(2000)
                {
                    AutoReset = true,
                    Enabled = true
                };
            timer.Elapsed += (s, e) =>
                {
                    var amount = random.Next(1, 10);
                    for (int i = 0; i < amount; ++i)
                        TheEvent?.Invoke(null, null);
                };

            Console.ReadLine();

            timer.Enabled = false;
            subscriber.Dispose();
        }
    }
}

You need to add the "Rx-Linq" NuGet package for it to run or use the following Fiddle: https://dotnetfiddle.net/TV5tD4

There you see several "0 elements received" which is what I would like to avoid. I know I could simple check for e.Count == 0, but as I use multiple of such buffers this does not seem optimal to me.

Is there a way to only create new buffered blocks of element if elements are available?
I am also open for other approaches to solve my problem of batching events on a time basis - I already looked into TPL Dataflows BatchBlock, but that seems to only support count based block sizes.

like image 638
Christoph Fink Avatar asked Feb 24 '16 15:02

Christoph Fink


2 Answers

Once again we can use the powerful GroupByUntil method to create this extension

public static IObservable<IList<TSource>> BufferWhenAvailable<TSource>
                                          (this IObservable<TSource> source, 
                                           TimeSpan threshold)
{
    return source.Publish( sp => 
                    sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold))
                      .SelectMany(i => i.ToList()));

}
like image 120
supertopi Avatar answered Nov 07 '22 09:11

supertopi


The standard way of doing this is simply

.Buffer(period)
.Where(buffer=>buffer.Any())

So effectively doing what you want to avoid (count==0). However, this check is very cheap and I would imagine if far cheaper than the other cost involved i.e. Scheduling. The only concern might be the amount allocations that are happening (every 50ms creating a List<T>) and then the impending GC Gen0 pressure that may build.

like image 21
Lee Campbell Avatar answered Nov 07 '22 09:11

Lee Campbell