Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs puzzle: Share a stream and conditionally retry it on new subscription

Tags:

angular

rxjs

Context:

Our web app can display different help panels at the same time.
When a panel require a given ID (for ex help-id-1), we reach an API, passing that ID and we get back the needed help.

Now, for some reasons we might be displaying 2 or more times the same help panel. BUT, of course we don't want to hit the API more than once, for the same item, if fetched without error OR currently fetching.

Our "producer" gives us a cold stream to retrieve it:

const getHelpContentById = (id: string) => fromPromise(
  httpCallToGetHelpResultFromThirdLib(id)
).pipe(
  catchError(error => of({ status: 'ERROR', item: null, id })),
  // extracting the body of the response
  map(getHelpItemFromResponse),
  // wrapping the response into an object { status: 'SUCCES', item, id }
  map(item => setStatusOnHelpItem(item, id)),
  startWith({ status: 'LOADING', item: null, id }),
)

We start the stream with an object containing a status, and later we receive another object with a new status which is either SUCCESS or ERROR.

The expected solution should:
- Fetch from the API on the first call for a given ID
- If another subscription happens for the same ID before the previous one is done (status LOADING), it should get the same stream as the first call without fetching again the API
- If a part of the app is displaying help-id-1, and fails, the stream should not be closed but rather next a value in it of type { status: 'ERROR', item: null, id }, this way if another component tries to display again help-id-1, as the last status for that ID is ERROR, it should try to reach the API once again and both subscribers should receive live update { status: 'LOADING', item: null, id } and then either error or success

First try:
Here's the first, non RxJs way of doing it I came up with:
(code extracted from a service, which is a class)

private helpItems: Map<
  string,
  { triggerNewFetchForItem: () => void; obs: Observable<HelpItemWithStatus> }
> = new Map();

private getFromCacheOrFetchHelpItem(id: string): Observable<HelpItemWithStatus> {
  let triggerNewFetchForItem$: BehaviorSubject<HelpItemWithStatus>;
  const idNotInCache = !this.helpItems.has(id);

  if (idNotInCache) {
    triggerNewFetchForItem$ = new BehaviorSubject<HelpItemWithStatus>(null);

    this.helpItems.set(id, {
      triggerNewFetchForItem: () => triggerNewFetchForItem$.next(null),
      obs: triggerNewFetchForItem$.pipe(
        switchMap(() => getHelpContentById(id)),
        shareReplay(1),
      ),
    });

    return this.helpItems.get(id).obs;
  } else {
    return this.helpItems.get(id).obs.pipe(
      tap(item => {
        if (item.status === ContentItemStatus.ERROR) {
          this.helpItems.get(id).triggerNewFetchForItem();
        }
      })
    );
  }
}

public getHelpItemById(id: string): Observable<HelpItemWithStatus> {
  return this.getFromCacheOrFetchHelpItem(id);
}

Attempt from my co-worker:

private getFromCacheOrFetchHelpItem4(id: string): Observable<HelpItemWithStatus> {
  let item = this.items.get(id);

  if (item && item.status !== ContentItemStatus.ERROR) {
    return of(item);
  }

  return getNewWrappedHelpItem(this.contentfulClient, id).pipe(
    tap(item => this.items.set(id, bs), 
    shareReplay(1)),
  )
}

Problems
- if you subscribe to it once, and it ends up with error
- you subscribe to it again from a new component
- it does another fetch as expected BUT changes the references
- the API call succeed, the second component is updated, the first one is not

Conclusion: I'm sure that there's a way better Rx way of doing that, maybe even without relying a an "external cache" (the Map here). The best would be obviously to have a new operator for that :)

like image 761
maxime1992 Avatar asked Jan 29 '23 13:01

maxime1992


2 Answers

I have basically this same scenario at work. I've been thinking about it for a while but finally decided to take a wack at it. My solution is a bit rough but I'll try to update the answer as I refine it. I'd love feedback on better ways to approach it.

Problems/Steps

It is a rather complicated problem so I'll build it out step by step. We will start with an api method with no parameters for simplicity.

Sharing the stream so that multiple subscribers don't trigger multiple api requests

Just add the share() operator on the end to make it multi-cast.

return api().pipe(share());

Sharing the last response from the api

Just change share() to shareReplay(1). The parameter indicates the number of previous responses to share. We only want the last one emitted so we put 1.

Alternatively you could use the tap operator to keep a reference to the last emitted value and do of(data) instead of returning the stream if the last one was successful. This only applies if you never want to invoke the api again (like the OP talked about) but I'm keeping it generic to be flexible for other solutions.

return api().pipe(shareReplay(1));

Allow a new subscription to trigger a new api request if the last one was not successful

This is a doozy. It is easy to get the last value or even to rerun the stream for a new subscriber. But that doesn't benefit the previous subscribers. You might get a successful result but none of the previous subscribers will be notified. Essentially what you are asking for is to have your subject emit a new value when it has a new subscription. As far as I am aware, that isn't possible.

My work around was to setup my own subject that I could trigger each time someone requested the stream. It isn't the same thing but I think it is really what we want anyways. What we really want is some way to retry. If it isn't automated by using the retryWhen operator then we want some manual way like a new component loading. When a new component loads they would request the stream so this works find.

So we create a subject and call next in a timeout. I would much rather use a ReplaySubject or BehaviorSubject to avoid the timeout but I ran into issues with angular change detection when I did that (ExpressionChangedAfterItHasBeenCheckedError). I'll need to look deeper into it.

Note that the share is on the outer stream. We want to share that rather than the inner one. Also note that I am using switchMap rather than switchMapTo since we want a new inner stream each time.

const trigger = new Subject<void>();
setTimeout(() => trigger.next());
return trigger.pipe(
  switchMap(() => api()),
  shareReplay(1)
);

Keep the stream alive after an error

The catchError operator lets you return an observable. Since we want that to be a message we just do catchError(e => of(e)). The problem is that this ends the stream. The fix is to put the catch inside of the switchMap so that the inner stream can die and the outer one can keep going.

return trigger.pipe(
  switchMap(() => api().pipe(
    catchError(err => of(err))
  ),
  shareReplay(1)
);

Know the state of the api

For this we will create a generic response wrapper that has a type property. The possible values are 'FETCHING', 'SUCCESS', and 'FAILURE'. We will use the startWith operator to send the fetching notification as soon as the api call begins (hence why it is on the end).

return trigger.pipe(
  switchMap(() => api().pipe(
    map((data) => ({ state: 'SUCCESS', data })),
    catchError(err => of({ state: 'FAILURE', err })),
    startWith({ state: 'FETCHING' })
  ),
  shareReplay(1)
);

Only allow one in-flight request at a time

Basically we want to not invoke the trigger if a request is in-flight. We could either do this with a flag or using the distinct operator with a trigger to reset it when the api call resolves. This second method is tricky since you would need a reference to the stream while constructing it. So we will just use a variable and could either wrap the trigger.next() in an if or put a filter on the stream. I'm going to do a filter.

private state: string;
...
return trigger.pipe(
  filter(() => this.state !== 'FETCHING'),
  switchMap(() => api().pipe(
    map((data) => ({ state: 'SUCCESS', data })),
    catchError(err => of({ state: 'FAILURE', err })),
    startWith({ state: 'FETCHING' }),
    tap(x => { this.state = x.state; })
  ),
  shareReplay(1)
);

Only retry on error, not success

All you have to do for this is to not call the trigger. So just change the condition on the trigger to be when state is not initialized or 'FAILURE'.

...
filter(() => this.state == null || this.state === 'FAILURE'),
...

Share streams for the same parameter

You basically just need to hash the parameters and use that as a key for the map. See the full example below.

Solution

Here is it all put together. I created a helper function that will generate an api method. The developer has to provide the api method and the hashing method for the parameters since that would be overly complicated to infer.

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, switchMap, map, startWith, catchError, shareReplay, filter } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';

// posible states of the api request
export enum ApiStateType {
  Fetching,
  Success,
  Failure
}

// wrapper for the api status messages
export interface ApiStatus<T> {
  state: ApiStateType;
  params: any[],
  data: T
}

// information related to a stream for a unique set of parameters
interface StreamConfig<T> {
  state: ApiStateType;
  trigger: Subject<void>;
  stream: Observable<ApiStatus<T>>;
}

export function generateCachedApi<T>(
  api: (...params) => Observable<T>,
  generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
  const cache = new Map<string, StreamConfig<T>>();

  return (...params): Observable<ApiStatus<T>> => {
    const key = generateKey(...params);
    let config = cache.get(key);

    if (!config) {
      console.log(`created new stream (${key})`);
      config = <StreamConfig<T>> { trigger: new Subject<void>() };
      config.stream = config.trigger.pipe(
        filter(() => config.state == null || config.state === ApiStateType.Failure),
        switchMap(() => {
          return api(...params).pipe(
            map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, params, data })),
            catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, params, data })),
            startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, params }),
            tap(x => { config.state = x.state; })
          );
        }),
        tap(x => { console.log('PUBLISH', x)}),
        shareReplay(1),
      );
      cache.set(key, config);
    } else {
      console.log(`returned existing stream (${key})`);
    }
    setTimeout(() => { config.trigger.next() });
    return config.stream;
  }
}

Here is a running example I hacked together: https://stackblitz.com/edit/api-cache

I'm sure that there's a way better Rx way of doing that, maybe even without relying a an "external cache" (the Map here). The best would be obviously to have a new operator for that :)

I created a cacheMap operator to try to do just that. I had a source that emitted the api parameters and the cacheMap operator would find or create the stream for the unique set of parameters and would return it mergeMap style. The problem is that every subscriber would now be subscribed to that inner observable. So you have to add a filter (see alt. solution below).

Alternate Solution

Here is an alternate solution that I thought of. Instead of maintaining multiple streams you could have one primary stream and give it to subscribers with a filter.

The problem with a single stream is that the replay would apply to all parameters. So you either have to use replay with no buffer or manage replay on your own.

If you use replay with no buffer then it will replay everything from FETCHING through SUCCESS which may cause extra processing though it probably wont be noticeable to the user. Ideally we would have a replayByKey operator but I haven't got around to writing it. So for now I am just using a map. The issue with using the map is that we are still emitting the same value to the subscribers who already received it. So we add a distinctUntilChanged operator to the instance stream. Alternatively you could create the instance stream and then put a takeUntil on it with the trigger being the instance stream filtered for success and put on a delay(0) to allow the last value through the pipe before closing it. This would complete the stream which is OK since you never get a new value one it once you have a success. I went with distinct because it allows you to get a new value if you wanted to change the requirements for that.

We use a mergeMap instead of a switchMap since we can have concurrent in-flight requests for different parameters and we don't want to cancel requests for different parameters.

Here is the solution:

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, mergeMap, map, startWith, catchError, share, filter, distinctUntilChanged } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';

// posible states of the api request
export enum ApiStateType {
  Fetching,
  Success,
  Failure
}

// wrapper for the api status messages
export interface ApiStatus<T> {
  state: ApiStateType;
  key: string;
  params: any[];
  data: T;
}

export function generateCachedApi<T>(
  api: (...params) => Observable<T>,
  generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
  const trigger = new Subject<any[]>();
  const stateCache = new Map<string, ApiStatus<T>>();
  const stream = trigger.pipe(
    map<any[], [any[], string]>((params) => [ params, generateKey(...params) ]),
    tap(([_, key]) => {
      if (!stateCache.has(key)) {
        stateCache.set(key, <ApiStatus<T>> {})
      }
    }),
    mergeMap(([params, key]) => {
      const apiStatus = stateCache.get(key);
      if (apiStatus.state === ApiStateType.Fetching || apiStatus.state === ApiStateType.Success) {
        return of(apiStatus);
      }
      return api(...params).pipe(
        map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, key, params, data })),
        catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, key, params, data })),
        startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, key, params }),
        tap(state => { stateCache.set(key, state); })
      )
    }),
    tap(x => { console.log('PUBLISH', x)}),
    share()
  );

  return (...params): Observable<ApiStatus<T>> => {
    const key = generateKey(...params);
    const instanceStream = stream.pipe(
      filter((response) => response.key === key),
      distinctUntilChanged()
    );
    setTimeout(() => { trigger.next(params) });
    return instanceStream;
  }
}
like image 115
bygrace Avatar answered Jan 31 '23 04:01

bygrace


Let's separate our concerns and deal with them one at a time:

  1. Transform emitted items into metadata of {id, status, item} with an initial value instead of allowing errors
    much like RxJS's materialize()
  2. Cache items until a certain condition becomes true (like an error)
  3. Subscribing to the right api response

1. Transform to metadata

We'll make a higher-order function that takes your getHelpContentById() and returns an observable of metadata objects that first emits 'LOADING', then 'SUCCESS' or 'ERROR' depending on the response:

const toMeta = fetch => id =>
  fetch(id)
    .pipe(
      map(response => ({id, status: 'SUCCESS', item: response})),
      catchError(e => [{id, status: 'ERROR',   item: null}]),
      startWith(       {id, status: 'LOADING', item: null}),
    )

2. Cache items until a condition (e.g. 'ERROR')

Let's keep the caching ignorant of:

  • how to fetch data from the api
  • how to determine if data needs to be uncached

Both bits of logic can be passed in as functions to our caching operator:

/**
 * `predicate` returns a boolean
 * `action`    returns an observable
 */
const cacheUntil = (predicate, action) => {
  const cache = new Map()

  // Use our predicate to check if we should uncache an item
  //
  const cacheUpdate = key => cachedValue => {
    if (predicate(cachedValue)) cache.delete(key)
  }

  // Prep an item, then cache it
  //
  const cache_set = key => {
    const out$ = action(key)   // fetching it in our case
      .pipe(
        tap(cacheUpdate(key)), // see cacheUpdate above this function
        shareReplay(1)         // make Observable returned from `action` 
                               // a Hot observable
                               // so that we don't fetch 
                               // on new subscriptions
      )
    cache.set(key, out$)
    return out$
  }

  // Glue it all together
  //
  return pipe(
    mergeMap(key => // flatten the cached Observable returned from `action()`
      cache.has(key)
        ? cache.get(key)
        : cache_set(key)
    )
  )
}

// make a stream of help ids to fetch from api
// that we can call `next(helpId)` on
const helpId$ = new Subject()

// use a higher-order function to make our apiCall
// return our metadata object of `{id, status, item}`
const help$ = helpId$.pipe(
  cacheUntil(hasMetaError, toMeta(getHelpContentById))
)

And the predicate for our first parameter, fairly self-explanatory:

const hasMetaError = meta => 
  meta.status === 'ERROR'

With that, we can push help ids through a subject and get an observable of cached api calls:

const helpIds$ = new Subject()

const help$ = helpId$.pipe(
  cacheUntil(hasMetaError, toMeta(apiCall$))
)

// When we need a panel...
//
const addPanel(panel, helpId) => {
  helpIds$.subscribe(data => panel.display(data))
  helpIds$.next('help-id-999')
}

3. Subscribing to the right api response

But wait! Each panel is receiving every item in the cache through its subscription every time we next(helpId).

So let's filter it.

const addPanel(panel, helpId) => {
  helpIds$
    .filter(meta => meta.id === helpId)
    .subscribe(data => panel.display(data))
  helpIds$.next('help-id-999')
}

That's it. Here's a running example: https://stackblitz.com/edit/rxjs-cache-until

Reusability

By separating out the metadata part, the caching operator becomes more reusable for other circumstances:

type Porridge = {celsius: number}

const makePorridge = celsius => {
  console.log("Make porridge at " + celsius + " celcius") 
  return Observable.of({celsius})
}

const justRight = (p: Porridge) => p.celsius >= 60 && p.celsius <= 70

const porridge$ = new Subject<number>()
const cachedPorridge$ = porridge$.pipe(
  cacheUntil(justRight, makePorridge)
)

let temperature = 30
const addBear = () => {
  cachedPorridge$.subscribe(() => console.log('Add bear'))
  porridge$.next(temperature += 10)
}

for(var i = 10; i; i--) {
  addBear()
}
// between the 60˚ and 70˚ 
// `makePorridge` gets called for every new bear
// because Goldilocks keeps eating it all
like image 35
Clarence Lee Avatar answered Jan 31 '23 02:01

Clarence Lee