Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to reset a RXJS scan operator based on another Observable

I have a component which triggers an onScrollEnd event when the last item in a virtual list is rendered. This event will do a new API request to fetch the next page and merge them with the previous results using the scan operator.

This component also has a search field which triggers an onSearch event.

How do I clear the previous accumulated results from the scan operator when a search event is triggered? Or do I need to refactor my logic here?

const loading$ = new BehaviorSubject(false);
const offset$ = new BehaviorSubject(0);
const search$ = new BehaviorSubject(null);

const options$: Observable<any[]> = merge(offset$, search$).pipe(
  // 1. Start the loading indicator.
  tap(() => loading$.next(true)),
  // 2. Fetch new items based on the offset.
  switchMap(([offset, searchterm]) => userService.getUsers(offset, searchterm)),
  // 3. Stop the loading indicator.
  tap(() => loading$.next(false)),
  // 4. Complete the Observable when there is no 'next' link.
  takeWhile((response) => response.links.next),
  // 5. Map the response.
  map(({ data }) =>
    data.map((user) => ({
      label: user.name,
      value: user.id
    }))
  ),
  // 6. Accumulate the new options with the previous options.
  scan((acc, curr) => {
    // TODO: Dont merge on search$.next 
    return [...acc, ...curr]);
  }
);

// Fetch next page
onScrollEnd: (offset: number) => offset$.next(offset);
// Fetch search results
onSearch: (term) => {
  search$.next(term);
};
like image 260
Ritchie Avatar asked May 27 '19 17:05

Ritchie


Video Answer


3 Answers

To manipulate the state of a scan you can write higher order functions that get the old state and the new update. Combine then with the merge operator. This way you stick to a clean stream-oriented solution without any side-effects.

const { Subject, merge } = rxjs;
const { scan, map } = rxjs.operators;

add$ = new Subject();
clear$ = new Subject();

add = (value) => (state) => [...state, value];
clear = () => (state) => [];

const result$ = merge(
  add$.pipe(map(add)),
  clear$.pipe(map(clear))
).pipe(
  scan((state, innerFn) => innerFn(state), [])
)

result$.subscribe(result => console.log(...result))

add$.next(1)
add$.next(2)
clear$.next()
add$.next(3)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

This method can easily be extended and/or adapted for other state usecases in rxjs.

Example (remove last item)

removeLast$ = new Subject()

removeLast = () => (state) => state.slice(0, -1);

merge(
  ..
  removeLast$.pipe(map(removeLast)),
  ..
)

like image 135
Jonathan Stellwag Avatar answered Oct 08 '22 15:10

Jonathan Stellwag


I think you could achieve what you want just by restructuring your chain (I'm omitting tap calls that trigger loading for simplicity):

search$.pipe(
  switchMap(searchterm =>
    concat(
      userService.getUsers(0, searchterm),
      offset$.pipe(concatMap(offset => userService.getUsers(offset, searchterm)))),
    ).pipe(
      map(({ data }) => data.map((user) => ({
        label: user.name,
        value: user.id
      }))),
      scan((acc, curr) => [...acc, ...curr], []),
    ),
  ),
);

Every emission from search$ will create a new inner Observable with its own scan that will start with an empty accumulator.

like image 6
martin Avatar answered Oct 08 '22 17:10

martin


Found a working solution: I check the current offset by using withLatestFrom before the scan operator and reset the accumulator if needed based on this value.

Stackblitz demo

like image 3
Ritchie Avatar answered Oct 08 '22 16:10

Ritchie