Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Caching of HTTP Service

Tags:

I am using RsJS 5 (5.0.1) to cache in Angular 2. It works well.

The meat of the caching function is:

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .publishReplay(1, this.RECACHE_INTERVAL)
  .refCount().take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));

The actualFn is the this.http.get('/some/resource').

Like I say, this is working perfectly for me. The cache is returned from the observable for duration of the RECACHE_INTERVAL. If a request is made after that interval, the actualFn() will be called.

What I am trying to figure out is when the RECACHE_INTERVAL expires and the actualFn() is called — how to return the last value. There is a space of time between when the RECACHE_INTERVAL expires and the actualFn() is replayed that the observable doesn't return a value. I would like to get rid of that gap in time and always return the last value.

I could use a side effect and store the last good value call .next(lastValue) while waiting for the HTTP response to return, but this seems naive. I would like to use a "RxJS" way, a pure function solution — if possible.

like image 526
Martin Avatar asked Feb 16 '17 17:02

Martin


2 Answers

Almost any complicated logic quickly goes out of control if you use plain rxjs. I would rather implement custom cache operator from scratch, you can use this gist as an example.

like image 189
kemsky Avatar answered Dec 07 '22 13:12

kemsky


Updated answer:

If always want to use the previous value while a new request is being made then can put another subject in the chain which keeps the most recent value.

You can then repeat the value so it is possible to tell if it came from the cache or not. The subscriber can then filter out the cached values if they are not interested in those.

// Take values while they pass the predicate, then return one more
// i.e also return the first value which returned false
const takeWhileInclusive = predicate => src =>
  src
  .flatMap(v => Observable.from([v, v]))
  .takeWhile((v, index) =>
     index % 2 === 0 ? true : predicate(v, index)
  )
  .filter((v, index) => index % 2 !== 1);

// Source observable will still push its values into the subject
// even after the subscriber unsubscribes
const keepHot = subject => src =>
  Observable.create(subscriber => {
    src.subscribe(subject);

    return subject.subscribe(subscriber);
  });

const cachedRequest = request
   // Subjects below only store the most recent value
   // so make sure most recent is marked as 'fromCache'
  .flatMap(v => Observable.from([
     {fromCache: false, value: v},
     {fromCache: true, value: v}
   ]))
   // Never complete subject
  .concat(Observable.never())
   // backup cache while new request is in progress
  .let(keepHot(new ReplaySubject(1)))
   // main cache with expiry time
  .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL)))
  .publish()
  .refCount()
  .let(takeWhileInclusive(v => v.fromCache));

  // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL
  // Subscribers will get the most recent cached value first then an updated value

https://acutmore.jsbin.com/kekevib/8/edit?js,console

Original answer:

Instead of setting a window size on the replaySubject - you could change the source observable to repeat after a delay.

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL))
  .publishReplay(1)
  .refCount()
  .take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));

The repeatWhen operator requires RxJs-beta12 or higher https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09

like image 36
Ashley Avatar answered Dec 07 '22 12:12

Ashley