Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Advanceable historical stream and live stream in Rx

Tags:

I have a hot observable that I normally implement using a normal Subject underneath, so that those interested could subscribe to a live a stream of notifications.

Now I would like to keep that live stream, but also expose a historical stream of all the events that have been AND have absolute times attached to those notifications to know when exactly they happened AS WELL AS allow the subscribers to advance the historical stream to any point in time before replaying the chronology.

  • I believe most of this could be achieved with a HistoricalScheduler and its AdvanceTo method, but I'm not sure exactly how?
  • And is use of Timestamped to save the times of the events needed?
  • And is a ReplaySubject needed to cache the live stream into historical records which could then be played back using the HistoricalScheduler?

How exactly can those two streams be implemented for the same source, or in other words, how can the below be appropriated to the current requirements?

how to save time in .net

[ see "Replaying the past" heading ]

like image 936
Cel Avatar asked Jan 07 '14 10:01

Cel


1 Answers

What the HistoricalScheduler gives you is the ability to control the forward motion of the virtual time of the scheduler.

What you do not get is random access over time. As virtual time is advanced, scheduled actions are executed, so they must be scheduled in advance. Any action scheduled in the past - i.e. at an absolute time that is behind the HistoricalScheduler.Now value - is executed immediately.

To replay events, you need to record them somehow, then schedule them using an instance of a HistoricalScheduler - and then advance time.

When you advance time, scheduled actions are executed at their due times - and when observables send OnXXX() to their subscribers, the Now property of the scheduler will have the current virtual time.

Each subscriber will need access to it's own scheduler in order to control time independently of other subscribers. This effectively means creating an observable per subscriber.

Here is a quick example I knocked up (that would run in LINQPad if you referenced nuget package rx-main).

First I record a live stream (in a totally non-production way!) recording events into a list. As you suggest, use of TimeStamp() works well to capture timing:

/* record a live stream */
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var log = source.Take(5).Timestamp().ToList().Wait();


Console.WriteLine("Time now is " + DateTime.Now);

Now we can use the HistoricalScheduler combined with cunning use of Generate to schedule events. Note that this approach prevents a ton of scheduled events being queued up in advance - instead we are just scheduling one at a time:

var scheduler = new HistoricalScheduler();

/* set up the scheduling of the recording events */
var replay = Observable.Generate(
    log.GetEnumerator(),
    events => events.MoveNext(),
    events => events,
    events => events.Current.Value,
    events => events.Current.Timestamp,
    scheduler);

Now when we subscribe, you can see that the HistoricalScheduler's Now property has the virtual time of the event:

replay.Subscribe(
    i => Console.WriteLine("Event: {0} happened at {1}", i,
    scheduler.Now)); 

Finally we can start the schedule (using Start() just tries to play all events, as opposed to using AdvanceTo to move to a specific time - it's like doing AdvanceTo(DateTime.MaxValue);

scheduler.Start();

The output for me was:

Time now is 07/01/2014 15:17:27
Event: 0 happened at 07/01/2014 15:17:23 +00:00
Event: 1 happened at 07/01/2014 15:17:24 +00:00
Event: 2 happened at 07/01/2014 15:17:25 +00:00
Event: 3 happened at 07/01/2014 15:17:26 +00:00
Event: 4 happened at 07/01/2014 15:17:27 +00:00

The upshot is that you'll probably end up having to create your own API over this tool to get something to suit your particular purposes. It leaves you a fair bit of work - but is nonetheless pretty powerful stuff.

What's nice is that the live observable and the replayed observable really look no different from each other - provided you remember to always parameterise your scheduler (!) - and so can have the same queries easily run over them, with temporal queries all working with the virtual time of the scheduler.

I've used this to test out new queries over old data to great effect in commercial scenarios.

What it isn't trying to be is a transport control, such as to serve scrolling back and forth through time in a GUI. Typically you run the history in big chunks, storing the output of new queries, and then use this data for subsequent display in a GUI so users can move back and forth at leisure via some other mechanism you provide.

Finally, you don't need ReplaySubject to cache the live stream; but you do need some means of recording events for replay - this could just be an observer that writes to a log.

like image 186
James World Avatar answered Sep 19 '22 04:09

James World