Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement time expiry hot observable in RxJS (or general in Reactive Extensions)

I'd like to implement Time Expiry cache with RxJs. Here is example of "normal" cache:

//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);

//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);

cachedData.subscribe(function(data){
    //after 2 seconds, result is here and data is cached
    //next subscribe returns immediately data
    cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});

When subscribe on cachedData is called for the first time, "heavy duty job" is called, and after 2 seconds result is saved in cachedData (AsyncSubject). Any other subsequent subscribe on cachedData returns immediately with saved result (thus cache implementation).

What I'd like to achieve is to "spice" this up with time period within cachedData is valid, and when that time passes, I'd like to re-run "heavy duty job" for new data and cache this again for new time period, etc...

Desired behaviour:

//pseudo code
cachedData.youShouldExpireInXSeconds(10);


//let's assume that all code is sequential from here

//this is 1.st run
cachedData.subscribe(function (data) {
    //this first subscription actually runs "heavy duty job", and
    //after 2 seconds first result data is here
});

//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
    //this result is cached
});

//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
    //i'm expecting same behaviour as it was 1.st run:
    // - this runs new "heavy duty job"
    // - and after 2 seconds we got new data result
});


//....
//etc

I'm new to Rx(Js) and cannot figure out how to implement this hot observable with cooldown.

like image 322
Tomo Avatar asked Oct 20 '14 21:10

Tomo


2 Answers

All you are missing is to schedule a task to replace your cachedData with a new AsyncSubject after a time period. Here's how to do it as a new Rx.Observable method:

Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
    var source = this,
        cachedData = undefined;

    // Use timeout scheduler if scheduler not supplied
    scheduler = scheduler || Rx.Scheduler.timeout;

    return Rx.Observable.create(function (observer) {

        if (!cachedData) {
            // The data is not cached.
            // create a subject to hold the result
            cachedData = new Rx.AsyncSubject();

            // subscribe to the query
            source.subscribe(cachedData);

            // when the query completes, start a timer which will expire the cache
            cachedData.subscribe(function () {
                scheduler.scheduleWithRelative(expirationMs, function () {
                    // clear the cache
                    cachedData = undefined;
                });
            });
        }

        // subscribe the observer to the cached data
        return cachedData.subscribe(observer);
    });
};

Usage:

// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);

// the cached query
var cachedData = data.cacheWithExpiration(15000);

// first observer must wait
cachedData.subscribe();

// wait 3 seconds

// second observer gets result instantly
cachedData.subscribe();

// wait 15 seconds

// observer must wait again
cachedData.subscribe();
like image 115
Brandon Avatar answered Oct 06 '22 13:10

Brandon


A simple solution would be to create a custom pipeable operator to repeatWhen a duration has passed. Here's what I came up with:

 export const refreshAfter = (duration: number) => (source: Observable<any>) =>
                                  source.pipe(
                                     repeatWhen(obs => obs.pipe(delay(duration))),
                                     publishReplay(1), 
                                     refCount());

Then I'm using it like this:

const serverTime$ = this.environmentClient.getServer().pipe(map(s => s.localTime))
const cachedServerTime$ = serverTime.pipe(refreshAfter(5000)); // 5s cache

Important note: This uses publishReplay(1), refCount() because shareReplay(1) doesn't unsubscribe from the source observable so it'll keep hitting your server forever. Unfortunately this has the consequence that on error that error will be replayed from the publishReplay(1), refCount(). There's a 'new improved' shareReplay coming soon. See notes here on a similar question. Once this 'new' version is available this answer should be updated - but the beauty of custom operators is you can fix them in one place.

like image 37
Simon_Weaver Avatar answered Oct 06 '22 14:10

Simon_Weaver