I'd like to implement Time Expiry cache with RxJs. Here is example of "normal" cache:
//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);
//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);
cachedData.subscribe(function(data){
//after 2 seconds, result is here and data is cached
//next subscribe returns immediately data
cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});
When subscribe
on cachedData
is called for the first time, "heavy duty job" is called, and after 2 seconds result is saved in cachedData
(AsyncSubject
). Any other subsequent subscribe
on cachedData
returns immediately with saved result (thus cache implementation).
What I'd like to achieve is to "spice" this up with time period within cachedData
is valid, and when that time passes, I'd like to re-run "heavy duty job" for new data and cache this again for new time period, etc...
Desired behaviour:
//pseudo code
cachedData.youShouldExpireInXSeconds(10);
//let's assume that all code is sequential from here
//this is 1.st run
cachedData.subscribe(function (data) {
//this first subscription actually runs "heavy duty job", and
//after 2 seconds first result data is here
});
//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
//this result is cached
});
//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
//i'm expecting same behaviour as it was 1.st run:
// - this runs new "heavy duty job"
// - and after 2 seconds we got new data result
});
//....
//etc
I'm new to Rx(Js) and cannot figure out how to implement this hot observable with cooldown.
All you are missing is to schedule a task to replace your cachedData
with a new AsyncSubject
after a time period. Here's how to do it as a new Rx.Observable
method:
Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
var source = this,
cachedData = undefined;
// Use timeout scheduler if scheduler not supplied
scheduler = scheduler || Rx.Scheduler.timeout;
return Rx.Observable.create(function (observer) {
if (!cachedData) {
// The data is not cached.
// create a subject to hold the result
cachedData = new Rx.AsyncSubject();
// subscribe to the query
source.subscribe(cachedData);
// when the query completes, start a timer which will expire the cache
cachedData.subscribe(function () {
scheduler.scheduleWithRelative(expirationMs, function () {
// clear the cache
cachedData = undefined;
});
});
}
// subscribe the observer to the cached data
return cachedData.subscribe(observer);
});
};
Usage:
// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);
// the cached query
var cachedData = data.cacheWithExpiration(15000);
// first observer must wait
cachedData.subscribe();
// wait 3 seconds
// second observer gets result instantly
cachedData.subscribe();
// wait 15 seconds
// observer must wait again
cachedData.subscribe();
A simple solution would be to create a custom pipeable operator to repeatWhen
a duration has passed. Here's what I came up with:
export const refreshAfter = (duration: number) => (source: Observable<any>) =>
source.pipe(
repeatWhen(obs => obs.pipe(delay(duration))),
publishReplay(1),
refCount());
Then I'm using it like this:
const serverTime$ = this.environmentClient.getServer().pipe(map(s => s.localTime))
const cachedServerTime$ = serverTime.pipe(refreshAfter(5000)); // 5s cache
Important note: This uses publishReplay(1), refCount() because shareReplay(1) doesn't unsubscribe from the source observable so it'll keep hitting your server forever. Unfortunately this has the consequence that on error that error will be replayed from the publishReplay(1)
, refCount()
. There's a 'new improved' shareReplay coming soon. See notes here on a similar question. Once this 'new' version is available this answer should be updated - but the beauty of custom operators is you can fix them in one place.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With