I have a hot observable that I normally implement using a normal Subject
underneath, so that those interested could subscribe to a live a stream of notifications.
Now I would like to keep that live stream, but also expose a historical stream of all the events that have been AND have absolute times attached to those notifications to know when exactly they happened AS WELL AS allow the subscribers to advance the historical stream to any point in time before replaying the chronology.
How exactly can those two streams be implemented for the same source, or in other words, how can the below be appropriated to the current requirements?
[ see "Replaying the past" heading ]
What the HistoricalScheduler
gives you is the ability to control the forward motion of the virtual time of the scheduler.
What you do not get is random access over time. As virtual time is advanced, scheduled actions are executed, so they must be scheduled in advance. Any action scheduled in the past - i.e. at an absolute time that is behind the HistoricalScheduler.Now
value - is executed immediately.
To replay events, you need to record them somehow, then schedule them using an instance of a HistoricalScheduler
- and then advance time.
When you advance time, scheduled actions are executed at their due times - and when observables send OnXXX()
to their subscribers, the Now
property of the scheduler will have the current virtual time.
Each subscriber will need access to it's own scheduler in order to control time independently of other subscribers. This effectively means creating an observable per subscriber.
Here is a quick example I knocked up (that would run in LINQPad if you referenced nuget package rx-main).
First I record a live stream (in a totally non-production way!) recording events into a list. As you suggest, use of TimeStamp()
works well to capture timing:
/* record a live stream */
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var log = source.Take(5).Timestamp().ToList().Wait();
Console.WriteLine("Time now is " + DateTime.Now);
Now we can use the HistoricalScheduler combined with cunning use of Generate to schedule events. Note that this approach prevents a ton of scheduled events being queued up in advance - instead we are just scheduling one at a time:
var scheduler = new HistoricalScheduler();
/* set up the scheduling of the recording events */
var replay = Observable.Generate(
log.GetEnumerator(),
events => events.MoveNext(),
events => events,
events => events.Current.Value,
events => events.Current.Timestamp,
scheduler);
Now when we subscribe, you can see that the HistoricalScheduler
's Now
property has the virtual time of the event:
replay.Subscribe(
i => Console.WriteLine("Event: {0} happened at {1}", i,
scheduler.Now));
Finally we can start the schedule (using Start() just tries to play all events, as opposed to using AdvanceTo
to move to a specific time - it's like doing AdvanceTo(DateTime.MaxValue);
scheduler.Start();
The output for me was:
Time now is 07/01/2014 15:17:27
Event: 0 happened at 07/01/2014 15:17:23 +00:00
Event: 1 happened at 07/01/2014 15:17:24 +00:00
Event: 2 happened at 07/01/2014 15:17:25 +00:00
Event: 3 happened at 07/01/2014 15:17:26 +00:00
Event: 4 happened at 07/01/2014 15:17:27 +00:00
The upshot is that you'll probably end up having to create your own API over this tool to get something to suit your particular purposes. It leaves you a fair bit of work - but is nonetheless pretty powerful stuff.
What's nice is that the live observable and the replayed observable really look no different from each other - provided you remember to always parameterise your scheduler (!) - and so can have the same queries easily run over them, with temporal queries all working with the virtual time of the scheduler.
I've used this to test out new queries over old data to great effect in commercial scenarios.
What it isn't trying to be is a transport control, such as to serve scrolling back and forth through time in a GUI. Typically you run the history in big chunks, storing the output of new queries, and then use this data for subsequent display in a GUI so users can move back and forth at leisure via some other mechanism you provide.
Finally, you don't need ReplaySubject
to cache the live stream; but you do need some means of recording events for replay - this could just be an observer that writes to a log.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With