Context:
Our web app can display different help panels at the same time.
When a panel require a given ID (for ex help-id-1
), we reach an API, passing that ID and we get back the needed help.
Now, for some reasons we might be displaying 2 or more times the same help panel. BUT, of course we don't want to hit the API more than once, for the same item, if fetched without error OR currently fetching.
Our "producer" gives us a cold stream to retrieve it:
const getHelpContentById = (id: string) => fromPromise(
httpCallToGetHelpResultFromThirdLib(id)
).pipe(
catchError(error => of({ status: 'ERROR', item: null, id })),
// extracting the body of the response
map(getHelpItemFromResponse),
// wrapping the response into an object { status: 'SUCCES', item, id }
map(item => setStatusOnHelpItem(item, id)),
startWith({ status: 'LOADING', item: null, id }),
)
We start the stream with an object containing a status, and later we receive another object with a new status which is either SUCCESS
or ERROR
.
The expected solution should:
- Fetch from the API on the first call for a given ID
- If another subscription happens for the same ID before the previous one is done (status LOADING
), it should get the same stream as the first call without fetching again the API
- If a part of the app is displaying help-id-1
, and fails, the stream should not be closed but rather next
a value in it of type { status: 'ERROR', item: null, id }
, this way if another component tries to display again help-id-1
, as the last status for that ID is ERROR
, it should try to reach the API once again and both subscribers should receive live update { status: 'LOADING', item: null, id }
and then either error or success
First try:
Here's the first, non RxJs way of doing it I came up with:
(code extracted from a service, which is a class)
private helpItems: Map<
string,
{ triggerNewFetchForItem: () => void; obs: Observable<HelpItemWithStatus> }
> = new Map();
private getFromCacheOrFetchHelpItem(id: string): Observable<HelpItemWithStatus> {
let triggerNewFetchForItem$: BehaviorSubject<HelpItemWithStatus>;
const idNotInCache = !this.helpItems.has(id);
if (idNotInCache) {
triggerNewFetchForItem$ = new BehaviorSubject<HelpItemWithStatus>(null);
this.helpItems.set(id, {
triggerNewFetchForItem: () => triggerNewFetchForItem$.next(null),
obs: triggerNewFetchForItem$.pipe(
switchMap(() => getHelpContentById(id)),
shareReplay(1),
),
});
return this.helpItems.get(id).obs;
} else {
return this.helpItems.get(id).obs.pipe(
tap(item => {
if (item.status === ContentItemStatus.ERROR) {
this.helpItems.get(id).triggerNewFetchForItem();
}
})
);
}
}
public getHelpItemById(id: string): Observable<HelpItemWithStatus> {
return this.getFromCacheOrFetchHelpItem(id);
}
Attempt from my co-worker:
private getFromCacheOrFetchHelpItem4(id: string): Observable<HelpItemWithStatus> {
let item = this.items.get(id);
if (item && item.status !== ContentItemStatus.ERROR) {
return of(item);
}
return getNewWrappedHelpItem(this.contentfulClient, id).pipe(
tap(item => this.items.set(id, bs),
shareReplay(1)),
)
}
Problems
- if you subscribe to it once, and it ends up with error
- you subscribe to it again from a new component
- it does another fetch as expected BUT changes the references
- the API call succeed, the second component is updated, the first one is not
Conclusion:
I'm sure that there's a way better Rx way of doing that, maybe even without relying a an "external cache" (the Map
here). The best would be obviously to have a new operator for that :)
I have basically this same scenario at work. I've been thinking about it for a while but finally decided to take a wack at it. My solution is a bit rough but I'll try to update the answer as I refine it. I'd love feedback on better ways to approach it.
It is a rather complicated problem so I'll build it out step by step. We will start with an api method with no parameters for simplicity.
Just add the share()
operator on the end to make it multi-cast.
return api().pipe(share());
Just change share()
to shareReplay(1)
. The parameter indicates the number of previous responses to share. We only want the last one emitted so we put 1
.
Alternatively you could use the tap
operator to keep a reference to the last emitted value and do of(data)
instead of returning the stream if the last one was successful. This only applies if you never want to invoke the api again (like the OP talked about) but I'm keeping it generic to be flexible for other solutions.
return api().pipe(shareReplay(1));
This is a doozy. It is easy to get the last value or even to rerun the stream for a new subscriber. But that doesn't benefit the previous subscribers. You might get a successful result but none of the previous subscribers will be notified. Essentially what you are asking for is to have your subject emit a new value when it has a new subscription. As far as I am aware, that isn't possible.
My work around was to setup my own subject that I could trigger each time someone requested the stream. It isn't the same thing but I think it is really what we want anyways. What we really want is some way to retry. If it isn't automated by using the retryWhen
operator then we want some manual way like a new component loading. When a new component loads they would request the stream so this works find.
So we create a subject and call next in a timeout. I would much rather use a ReplaySubject
or BehaviorSubject
to avoid the timeout but I ran into issues with angular change detection when I did that (ExpressionChangedAfterItHasBeenCheckedError
). I'll need to look deeper into it.
Note that the share
is on the outer stream. We want to share that rather than the inner one. Also note that I am using switchMap
rather than switchMapTo
since we want a new inner stream each time.
const trigger = new Subject<void>();
setTimeout(() => trigger.next());
return trigger.pipe(
switchMap(() => api()),
shareReplay(1)
);
The catchError
operator lets you return an observable. Since we want that to be a message we just do catchError(e => of(e))
. The problem is that this ends the stream. The fix is to put the catch inside of the switchMap
so that the inner stream can die and the outer one can keep going.
return trigger.pipe(
switchMap(() => api().pipe(
catchError(err => of(err))
),
shareReplay(1)
);
For this we will create a generic response wrapper that has a type property. The possible values are 'FETCHING', 'SUCCESS', and 'FAILURE'. We will use the startWith
operator to send the fetching notification as soon as the api call begins (hence why it is on the end).
return trigger.pipe(
switchMap(() => api().pipe(
map((data) => ({ state: 'SUCCESS', data })),
catchError(err => of({ state: 'FAILURE', err })),
startWith({ state: 'FETCHING' })
),
shareReplay(1)
);
Basically we want to not invoke the trigger if a request is in-flight. We could either do this with a flag or using the distinct
operator with a trigger to reset it when the api call resolves. This second method is tricky since you would need a reference to the stream while constructing it. So we will just use a variable and could either wrap the trigger.next()
in an if or put a filter on the stream. I'm going to do a filter.
private state: string;
...
return trigger.pipe(
filter(() => this.state !== 'FETCHING'),
switchMap(() => api().pipe(
map((data) => ({ state: 'SUCCESS', data })),
catchError(err => of({ state: 'FAILURE', err })),
startWith({ state: 'FETCHING' }),
tap(x => { this.state = x.state; })
),
shareReplay(1)
);
All you have to do for this is to not call the trigger. So just change the condition on the trigger to be when state is not initialized or 'FAILURE'.
...
filter(() => this.state == null || this.state === 'FAILURE'),
...
You basically just need to hash the parameters and use that as a key for the map. See the full example below.
Here is it all put together. I created a helper function that will generate an api method. The developer has to provide the api method and the hashing method for the parameters since that would be overly complicated to infer.
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, switchMap, map, startWith, catchError, shareReplay, filter } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';
// posible states of the api request
export enum ApiStateType {
Fetching,
Success,
Failure
}
// wrapper for the api status messages
export interface ApiStatus<T> {
state: ApiStateType;
params: any[],
data: T
}
// information related to a stream for a unique set of parameters
interface StreamConfig<T> {
state: ApiStateType;
trigger: Subject<void>;
stream: Observable<ApiStatus<T>>;
}
export function generateCachedApi<T>(
api: (...params) => Observable<T>,
generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
const cache = new Map<string, StreamConfig<T>>();
return (...params): Observable<ApiStatus<T>> => {
const key = generateKey(...params);
let config = cache.get(key);
if (!config) {
console.log(`created new stream (${key})`);
config = <StreamConfig<T>> { trigger: new Subject<void>() };
config.stream = config.trigger.pipe(
filter(() => config.state == null || config.state === ApiStateType.Failure),
switchMap(() => {
return api(...params).pipe(
map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, params, data })),
catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, params, data })),
startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, params }),
tap(x => { config.state = x.state; })
);
}),
tap(x => { console.log('PUBLISH', x)}),
shareReplay(1),
);
cache.set(key, config);
} else {
console.log(`returned existing stream (${key})`);
}
setTimeout(() => { config.trigger.next() });
return config.stream;
}
}
Here is a running example I hacked together: https://stackblitz.com/edit/api-cache
I'm sure that there's a way better Rx way of doing that, maybe even without relying a an "external cache" (the Map here). The best would be obviously to have a new operator for that :)
I created a cacheMap
operator to try to do just that. I had a source that emitted the api parameters and the cacheMap
operator would find or create the stream for the unique set of parameters and would return it mergeMap
style. The problem is that every subscriber would now be subscribed to that inner observable. So you have to add a filter (see alt. solution below).
Here is an alternate solution that I thought of. Instead of maintaining multiple streams you could have one primary stream and give it to subscribers with a filter.
The problem with a single stream is that the replay would apply to all parameters. So you either have to use replay with no buffer or manage replay on your own.
If you use replay with no buffer then it will replay everything from FETCHING
through SUCCESS
which may cause extra processing though it probably wont be noticeable to the user. Ideally we would have a replayByKey
operator but I haven't got around to writing it. So for now I am just using a map. The issue with using the map is that we are still emitting the same value to the subscribers who already received it. So we add a distinctUntilChanged
operator to the instance stream. Alternatively you could create the instance stream and then put a takeUntil
on it with the trigger being the instance stream filtered for success and put on a delay(0)
to allow the last value through the pipe before closing it. This would complete the stream which is OK since you never get a new value one it once you have a success. I went with distinct because it allows you to get a new value if you wanted to change the requirements for that.
We use a mergeMap
instead of a switchMap
since we can have concurrent in-flight requests for different parameters and we don't want to cancel requests for different parameters.
Here is the solution:
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, mergeMap, map, startWith, catchError, share, filter, distinctUntilChanged } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';
// posible states of the api request
export enum ApiStateType {
Fetching,
Success,
Failure
}
// wrapper for the api status messages
export interface ApiStatus<T> {
state: ApiStateType;
key: string;
params: any[];
data: T;
}
export function generateCachedApi<T>(
api: (...params) => Observable<T>,
generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
const trigger = new Subject<any[]>();
const stateCache = new Map<string, ApiStatus<T>>();
const stream = trigger.pipe(
map<any[], [any[], string]>((params) => [ params, generateKey(...params) ]),
tap(([_, key]) => {
if (!stateCache.has(key)) {
stateCache.set(key, <ApiStatus<T>> {})
}
}),
mergeMap(([params, key]) => {
const apiStatus = stateCache.get(key);
if (apiStatus.state === ApiStateType.Fetching || apiStatus.state === ApiStateType.Success) {
return of(apiStatus);
}
return api(...params).pipe(
map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, key, params, data })),
catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, key, params, data })),
startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, key, params }),
tap(state => { stateCache.set(key, state); })
)
}),
tap(x => { console.log('PUBLISH', x)}),
share()
);
return (...params): Observable<ApiStatus<T>> => {
const key = generateKey(...params);
const instanceStream = stream.pipe(
filter((response) => response.key === key),
distinctUntilChanged()
);
setTimeout(() => { trigger.next(params) });
return instanceStream;
}
}
Let's separate our concerns and deal with them one at a time:
{id, status, item}
with an initial value instead of allowing errorsmaterialize()
We'll make a higher-order function that takes your getHelpContentById()
and returns an observable of metadata objects that first emits 'LOADING', then 'SUCCESS' or 'ERROR' depending on the response:
const toMeta = fetch => id =>
fetch(id)
.pipe(
map(response => ({id, status: 'SUCCESS', item: response})),
catchError(e => [{id, status: 'ERROR', item: null}]),
startWith( {id, status: 'LOADING', item: null}),
)
Let's keep the caching ignorant of:
Both bits of logic can be passed in as functions to our caching operator:
/**
* `predicate` returns a boolean
* `action` returns an observable
*/
const cacheUntil = (predicate, action) => {
const cache = new Map()
// Use our predicate to check if we should uncache an item
//
const cacheUpdate = key => cachedValue => {
if (predicate(cachedValue)) cache.delete(key)
}
// Prep an item, then cache it
//
const cache_set = key => {
const out$ = action(key) // fetching it in our case
.pipe(
tap(cacheUpdate(key)), // see cacheUpdate above this function
shareReplay(1) // make Observable returned from `action`
// a Hot observable
// so that we don't fetch
// on new subscriptions
)
cache.set(key, out$)
return out$
}
// Glue it all together
//
return pipe(
mergeMap(key => // flatten the cached Observable returned from `action()`
cache.has(key)
? cache.get(key)
: cache_set(key)
)
)
}
// make a stream of help ids to fetch from api
// that we can call `next(helpId)` on
const helpId$ = new Subject()
// use a higher-order function to make our apiCall
// return our metadata object of `{id, status, item}`
const help$ = helpId$.pipe(
cacheUntil(hasMetaError, toMeta(getHelpContentById))
)
And the predicate for our first parameter, fairly self-explanatory:
const hasMetaError = meta =>
meta.status === 'ERROR'
With that, we can push help ids through a subject and get an observable of cached api calls:
const helpIds$ = new Subject()
const help$ = helpId$.pipe(
cacheUntil(hasMetaError, toMeta(apiCall$))
)
// When we need a panel...
//
const addPanel(panel, helpId) => {
helpIds$.subscribe(data => panel.display(data))
helpIds$.next('help-id-999')
}
But wait! Each panel is receiving every item in the cache through its subscription every time we next(helpId)
.
So let's filter it.
const addPanel(panel, helpId) => {
helpIds$
.filter(meta => meta.id === helpId)
.subscribe(data => panel.display(data))
helpIds$.next('help-id-999')
}
That's it. Here's a running example: https://stackblitz.com/edit/rxjs-cache-until
By separating out the metadata part, the caching operator becomes more reusable for other circumstances:
type Porridge = {celsius: number}
const makePorridge = celsius => {
console.log("Make porridge at " + celsius + " celcius")
return Observable.of({celsius})
}
const justRight = (p: Porridge) => p.celsius >= 60 && p.celsius <= 70
const porridge$ = new Subject<number>()
const cachedPorridge$ = porridge$.pipe(
cacheUntil(justRight, makePorridge)
)
let temperature = 30
const addBear = () => {
cachedPorridge$.subscribe(() => console.log('Add bear'))
porridge$.next(temperature += 10)
}
for(var i = 10; i; i--) {
addBear()
}
// between the 60˚ and 70˚
// `makePorridge` gets called for every new bear
// because Goldilocks keeps eating it all
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With