Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to window/buffer IObservable<T> into chunks based on a Func<T>

Given a class:

class Foo { DateTime Timestamp {get; set;} }

...and an IObservable<Foo>, with guaranteed monotonically increasing Timestamps, how can I generate an IObservable<IList<Foo>> chunked into Lists based on those Timestamps?

I.e. each IList<Foo> should have five seconds of events, or whatever. I know I can use Buffer with a TimeSpan overload, but I need to take the time from the events themselves, not the wall clock. (Unless there a clever way of providing an IScheduler here which uses the IObservable itself as the source of .Now?)

If I try to use the Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries) overload like so:

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
    x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();

pub.Buffer(windows).Subscribe(x => t.Dump()));  // linqpad
pub.Connect();

...then the IList instances contain the item that causes the window to be closed, but I really want this item to go into the next window/buffer.

E.g. with timestamps [0, 1, 10, 11, 15] you will get blocks of [[0], [1, 10], [11, 15]] instead of [[0, 1], [10, 11], [15]]

like image 323
Alastair Maw Avatar asked Oct 15 '14 13:10

Alastair Maw


2 Answers

Here's an idea. The group key condition is the "window number" and I use GroupByUntil. This gives you the desired output in your example (and I've used an int stream just like that example - but you can substitute whatever you need to number your windows).

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();
        var xs = scheduler.CreateHotObservable<int>(
            OnNext(0, 0),
            OnNext(1, 1),
            OnNext(10, 10),
            OnNext(11, 11),
            OnNext(15, 15),
            OnCompleted(16, 0));                  
            
        xs.Publish(ps =>                                // (1)
            ps.GroupByUntil(
                p => p / 5,                             // (2)
                grp => ps.Where(p => p / 5 != grp.Key)) // (3)
            .SelectMany(x => x.ToList()))               // (4)
        .Subscribe(Console.WriteLine);
        
        scheduler.Start();
    }
}

Notes

  1. We publish the source stream because we will subscribe more than once.
  2. This is a function to create a group key - use this to generate a window number from your item type.
  3. This is the group termination condition - use this to inspect the source stream for an item in another window. Note that means a window won't close until an element outside of it arrives, or the source stream terminates. This is obvious if you think about it - your desired output requires consideration of next element after a window ends. Note if your source bears any relation to real time, you could merge this with an Observable.Timer+Select that outputs a null/default instance of your term to terminate the stream earlier.
  4. SelectMany puts the groups into lists and flattens the stream.

This example will run in LINQPad quite nicely if you include nuget package rx-testing. New up a Tests instance and just run the Test() method.

like image 96
James World Avatar answered Nov 15 '22 00:11

James World


I think James World's answer is neater/more readable, but for posterity, I've found another way to do this using Buffer():

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
        x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks))
    .DistinctUntilChanged().Publish.RefCount();

pub.Buffer(windows, x => windows).Subscribe(x => t.Dump()));
pub.Connect();

With 10m events, James' approach is more than 2.5x as fast (20s vs. 56s on my machine).

like image 31
Alastair Maw Avatar answered Nov 15 '22 00:11

Alastair Maw