Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using RxJS switchMap to only unsubscribe from streams with the same request URL/action payload (redux-observable epics)

I have an interface where the user may trigger calls to the same endpoint but with different params (in this case UUIDs). Up until now I've been enjoying the behavior of switchMap canceling my in-flight http requests whenever I dispatch a new redux action with the same type, and in this case I still want that behavior, but only if the new action's requested UUID (part of the action object) is the same as one that's already in progress. I'm not exactly sure of the right approach to this.

For example, after dispatching several actions at once, I'd hope that all of the ones with unique ids complete, but those that repeat an existing and not yet completed id cancel the previous request and take its place.

ex:

store.dispatch({ type: "GET_SOME_DATA", uuid: "1" })
store.dispatch({ type: "GET_SOME_DATA", uuid: "2" })
store.dispatch({ type: "GET_SOME_DATA", uuid: "2" })
store.dispatch({ type: "GET_SOME_DATA", uuid: "3" })
store.dispatch({ type: "GET_SOME_DATA", uuid: "2" })
// Get results back for '1', then '3', then '2' assuming equal response times.
// Only the duplicate uuid calls were cancelled, even though all have the same 'type'

I've tried using .distinctUntilChanged((a, b) => a.uuid === b.uuid) to filter the stream's input into .switchMap just to see what would happen, but that merely limits what actions reach the switchMap, and the behavior of canceling all but the most recent GET_SOME_DATA action related API call still occurs.

const getDataEpic = (action$) =>
  action$.ofType(GET_SOME_DATA)
    .switchMap(({ uuid }) => // would be great if the switchMap would only cancel existing streams with same uuid
      ajax.getJSON(`/api/datastuff/${uuid}`)
        .map((data) => successAction(uuid, data.values))
        .catch((err) => Observable.of(
          errorAction(uuid),
          setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'),
        ))

For now, I'm using mergeMap, but I'm worried that this could cause issues like I've run into with live searching where an older request might resolve after the most recent one, causing my redux store to be updated with old data since mergeMap doesn't cancel the Observable streams like switchMap does... is there a way for me to look at current RxJS Ajax requests and cancel those with the new action's url, or a better solution that I'm clearly missing?

Cheers!

Edit: I'm wondering if changing switchMap to mergeMap, then chaining takeUntil and canceling on other GET_SOME_DATA actions would be a proper approach, or if that would just make all requests cancel immediately? For example

const getDataEpic = (action$) =>
  action$.ofType(GET_SOME_DATA)
    .mergeMap(({ uuid }) =>
      ajax.getJSON(`/api/datastuff/${uuid}`)
        .takeUntil(
          action$.ofType(GET_SOME_DATA).filter(laterAction => laterAction.uuid === uuid)
        )
        .map((data) => successAction(uuid, data.values))
        .catch((err) => Observable.of(
          errorAction(uuid),
          setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'),
    ))

Edit2: Apparently the takeUntil addition appears to be working! I'm not sure if it's 100% on the up and up in terms of being appropriate but I'd love some feedback. I'd also like to support a manual cancellation option as well, would the method discussed here be the proper implementation?

Edit3: I think this is my final, working version. Removed the destructuring of the Redux action in the mergeMap just in case somebody newer to redux-observables comes across this:

const getDataEpic = (action$) =>
  action$.ofType(GET_SOME_DATA)
    .mergeMap((action) =>
      ajax.getJSON(`/api/datastuff/${action.uuid}`)
        .takeUntil(Observable.merge(
          action$.ofType(MANUALLY_CANCEL_GETTING_DATA)
            .filter((cancelAction) => cancelAction.uuid === action.uuid),
          action$.ofType(GET_SOME_DATA)
            .filter((laterAction) => laterAction.uuid === action.uuid),
        ))
        .map((data) => successAction(action.uuid, data.values))
        .catch((err) => Observable.of(
          errorAction(action.uuid),
          setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'),
    ))

And the observed network behavior from rapidly clicking everything in sight. Only the non-duplicated id requests went through!

enter image description here

like image 258
Shawn Stern Avatar asked Feb 21 '18 06:02

Shawn Stern


1 Answers

You can also use the groupBy operator to handle streams with the same uuid and apply the useful switchMap behavior on every uuid action stream:

action$.ofType(GET_SOME_DATA)
.groupBy(
    ({ uuid }) => uuid, // group all the actions by uuid
    x => x,
    group$ => group$.switchMap(_ => Observable.timer(5000)) // close existing streams if no event of a grouped action is emitted 5 seconds in a row (prevents memory leaks)
)
.mergeMap(actionsGroupedByUuid$ => 
    actionsGroupedByUuid$.switchMap(({ uuid }) => 
        ajax.getJSON(`/api/datastuff/${uuid}`)
            .map((data) => successAction(uuid, data.values))
            .catch((err) => Observable.of(
                errorAction(uuid),
                setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'),
            )) 
    )
);
like image 148
ZahiC Avatar answered Oct 17 '22 23:10

ZahiC