Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs: How to use .shareReplay() to cache HTTP data?

Tags:

angular

rxjs

I've been searching for a way to cache HTTP data in Angular, and finally I decided to use .shareReplay() to achieve it. Moreover, I'd like to clear the cache manually or clear it whenever this ReplaySubject is out-of-date, cause I know that I can set a time duration for .shareReplay(). What's the best practice to meet the two requirements above?

like image 881
Tony Avatar asked Sep 07 '17 07:09

Tony


3 Answers

You can't really use shareReplay to cache observable, flush cache and provide initial cache seed at the same time.

  • shareReplay is fine to cache values using ReplaySubject
  • to flush cache you would need some outer control to force resubscribe on underlying observable and reset ReplaySubject
  • providing initial value could be done with startWith but that has a problem of emitting multiple values where you expect only one.

I think this is special case enough to go with custom operator and cache controller.

const { defer } = require('rxjs/observable/defer');
const { AsyncSubject, Scheduler } = require('rxjs');

class Cache {
  get isExpired() {
    return this.scheduler.now() > this.expiration;
  }

  constructor(initialValue, maxTime = Number.POSITIVE_INFINITY, scheduler = Scheduler.queue) {
    this.isComplete = false;
    this.hasError = false;
    this.subject = null;
    this.maxTime = maxTime;
    this.scheduler = scheduler;
    this.expiration = Number.POSITIVE_INFINITY;

    if (typeof initialValue !== 'undefined') {
      this.subject = new AsyncSubject();
      this.subject.next(initialValue);
      this.subject.complete();
      this.isComplete = true;
      this.extendExpiration();
    }
  }

  extendExpiration() {
    this.expiration = this.scheduler.now() + this.maxTime;
  }

  preventExpiration() {
    this.expiration = Number.POSITIVE_INFINITY;
  }

  flush() {
    this.expiration = 0;
  }
}

function cachingOperator(cache = new Cache()) {
  let refCount = 0;
  let subscription;
  return function cacheOperation(source) {
    refCount++;
    if (!cache.subject || cache.hasError || cache.isExpired) {
      cache.hasError = false;
      cache.preventExpiration(); // prevent expiration before underlying observable completes
      cache.subject = new AsyncSubject();
      subscription = source.subscribe({
        next(value) { cache.subject.next(value); },
        error(err) {
          cache.hasError = true;
          cache.subject.error(err);
        },
        complete() {
          cache.isComplete = true;
          cache.extendExpiration();
          cache.subject.complete();
        },
      });
    }
    const innerSub = cache.subject.subscribe(this);
    return () => {
      refCount--;
      innerSub.unsubscribe();
      if (subscription && refCount === 0 && cache.isComplete) {
        subscription.unsubscribe();
      }
    };
  };
}

const caching = cache => source => source.lift(cachingOperator(cache));

Cache object will hold state. Your code is responsible for instantiation and that gives you full control.

Usage:

const myCache = new Cache({ result: 'initialValue' }, 1000); // initial value and cache time 1s

const request = httpService.get(url)
  .pipe(
    caching(myCache)
  );

request.subscribe(); // initial value

setTimeout(() => {
  request.subscribe(); // issues new request
  request.subscribe(); // cache request
  myCache.flush();
  request.subscribe(); // new request
}, 1100);

The API could be definitely improved and made more fit to your use-case, but I hope this provides rough idea about how to approach it.

like image 56
m1ch4ls Avatar answered Oct 23 '22 09:10

m1ch4ls


You should use startsWith instead because this allows you to initialize your data with a local cache and later replace with more updated data.

like image 32
Mike Tung Avatar answered Oct 23 '22 07:10

Mike Tung


Saw this question while searching to a solution to a similar problem. I wasn't able to get it to work with just the shareReplay api, but wanted to share a rxjs only solution (tw: inTypescript) in case anyone can use it:

     private cache = {};
     private cacheProperty(key:string, prop:string, httpCall$:Observable<any>, expirationTime?: number): Observable<any>{
        if(!(key in this.cache && prop in this.cache[key])){
          const timer$ = timer(expirationTime)
          const clearCache = new Subject<void>()
          const clearCache$ = merge(timer$, clearCache.asObservable()).pipe(take(1))
          const cacheProp$ = httpCall$.pipe(shareReplay(1, 5000),takeUntil(clearCache$))
          clearCache$.subscribe(_=> delete this.cache[key][prop])
          this.cache[key] = Object.assign(!!this.cache[key] ? this.cache[key] : {} ,{[prop]:{call$:cacheProp$, clear:clearCache }})
        }
        return this.cache[key][prop].call
      }

      public clearPropCache(key:string, prop:string){
        this.cache[key][prop].clear.next()
      }

It is personalized to the project we are on which requires multiple "user" caches (split by key) as well as multiple types of cache(lets say for example, bio) denoted by the prop. You can pass in an expiration time as well as manually clear it with a void subject in the second.

We are also using angular, so the httpCall$ should come in the form of an observable returned from the HttpClient module haven't tried it with other sources.

I also found this article very helpful in creating and slimming down my solution: https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html

Hope it helps anyone else working on simple caching solutions!

like image 31
bluwaterdogz Avatar answered Oct 23 '22 09:10

bluwaterdogz