I am using RsJS 5 (5.0.1) to cache in Angular 2. It works well.
The meat of the caching function is:
const observable = Observable.defer(
() => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
)
.publishReplay(1, this.RECACHE_INTERVAL)
.refCount().take(1)
.do(() => this.console.log('CACHE HIT', cacheKey));
The actualFn
is the this.http.get('/some/resource')
.
Like I say, this is working perfectly for me. The cache is returned from the observable for duration of the RECACHE_INTERVAL
. If a request is made after that interval, the actualFn()
will be called.
What I am trying to figure out is when the RECACHE_INTERVAL
expires and the actualFn()
is called — how to return the last value. There is a space of time between when the RECACHE_INTERVAL
expires and the actualFn()
is replayed that the observable doesn't return a value. I would like to get rid of that gap in time and always return the last value.
I could use a side effect and store the last good value call .next(lastValue)
while waiting for the HTTP response to return, but this seems naive. I would like to use a "RxJS" way, a pure function solution — if possible.
Almost any complicated logic quickly goes out of control if you use plain rxjs
. I would rather implement custom cache
operator from scratch, you can use this gist as an example.
Updated answer:
If always want to use the previous value while a new request is being made then can put another subject in the chain which keeps the most recent value.
You can then repeat the value so it is possible to tell if it came from the cache or not. The subscriber can then filter out the cached values if they are not interested in those.
// Take values while they pass the predicate, then return one more
// i.e also return the first value which returned false
const takeWhileInclusive = predicate => src =>
src
.flatMap(v => Observable.from([v, v]))
.takeWhile((v, index) =>
index % 2 === 0 ? true : predicate(v, index)
)
.filter((v, index) => index % 2 !== 1);
// Source observable will still push its values into the subject
// even after the subscriber unsubscribes
const keepHot = subject => src =>
Observable.create(subscriber => {
src.subscribe(subject);
return subject.subscribe(subscriber);
});
const cachedRequest = request
// Subjects below only store the most recent value
// so make sure most recent is marked as 'fromCache'
.flatMap(v => Observable.from([
{fromCache: false, value: v},
{fromCache: true, value: v}
]))
// Never complete subject
.concat(Observable.never())
// backup cache while new request is in progress
.let(keepHot(new ReplaySubject(1)))
// main cache with expiry time
.let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL)))
.publish()
.refCount()
.let(takeWhileInclusive(v => v.fromCache));
// Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL
// Subscribers will get the most recent cached value first then an updated value
https://acutmore.jsbin.com/kekevib/8/edit?js,console
Original answer:
Instead of setting a window size on the replaySubject - you could change the source observable to repeat after a delay.
const observable = Observable.defer(
() => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
)
.repeatWhen(_ => _.delay(this.RECACHE_INTERVAL))
.publishReplay(1)
.refCount()
.take(1)
.do(() => this.console.log('CACHE HIT', cacheKey));
The repeatWhen
operator requires RxJs-beta12 or higher
https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With