Given a class:
class Foo { DateTime Timestamp {get; set;} }
...and an IObservable<Foo>
, with guaranteed monotonically increasing Timestamp
s, how can I generate an IObservable<IList<Foo>>
chunked into Lists based on those Timestamp
s?
I.e. each IList<Foo>
should have five seconds of events, or whatever. I know I can use Buffer
with a TimeSpan
overload, but I need to take the time from the events themselves, not the wall clock. (Unless there a clever way of providing an IScheduler
here which uses the IObservable
itself as the source of .Now
?)
If I try to use the Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)
overload like so:
IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();
pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad
pub.Connect();
...then the IList
instances contain the item that causes the window to be closed, but I really want this item to go into the next window/buffer.
E.g. with timestamps [0, 1, 10, 11, 15]
you will get blocks of [[0], [1, 10], [11, 15]]
instead of [[0, 1], [10, 11], [15]]
Here's an idea. The group key condition is the "window number" and I use GroupByUntil
. This gives you the desired output in your example (and I've used an int stream just like that example - but you can substitute whatever you need to number your windows).
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateHotObservable<int>(
OnNext(0, 0),
OnNext(1, 1),
OnNext(10, 10),
OnNext(11, 11),
OnNext(15, 15),
OnCompleted(16, 0));
xs.Publish(ps => // (1)
ps.GroupByUntil(
p => p / 5, // (2)
grp => ps.Where(p => p / 5 != grp.Key)) // (3)
.SelectMany(x => x.ToList())) // (4)
.Subscribe(Console.WriteLine);
scheduler.Start();
}
}
Observable.Timer+Select
that outputs a null/default instance of your term to terminate the stream earlier.This example will run in LINQPad quite nicely if you include nuget package rx-testing. New up a Tests instance and just run the Test()
method.
I think James World's answer is neater/more readable, but for posterity, I've found another way to do this using Buffer()
:
IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks))
.DistinctUntilChanged().Publish.RefCount();
pub.Buffer(windows, x => windows).Subscribe(x => t.Dump()));
pub.Connect();
With 10m events, James' approach is more than 2.5x as fast (20s vs. 56s on my machine).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With