I have a component which triggers an onScrollEnd event when the last item in a virtual list is rendered. This event will do a new API request to fetch the next page and merge them with the previous results using the scan operator.
This component also has a search field which triggers an onSearch event.
How do I clear the previous accumulated results from the scan operator when a search event is triggered? Or do I need to refactor my logic here?
const loading$ = new BehaviorSubject(false);
const offset$ = new BehaviorSubject(0);
const search$ = new BehaviorSubject(null);
const options$: Observable<any[]> = merge(offset$, search$).pipe(
  // 1. Start the loading indicator.
  tap(() => loading$.next(true)),
  // 2. Fetch new items based on the offset.
  switchMap(([offset, searchterm]) => userService.getUsers(offset, searchterm)),
  // 3. Stop the loading indicator.
  tap(() => loading$.next(false)),
  // 4. Complete the Observable when there is no 'next' link.
  takeWhile((response) => response.links.next),
  // 5. Map the response.
  map(({ data }) =>
    data.map((user) => ({
      label: user.name,
      value: user.id
    }))
  ),
  // 6. Accumulate the new options with the previous options.
  scan((acc, curr) => {
    // TODO: Dont merge on search$.next 
    return [...acc, ...curr]);
  }
);
// Fetch next page
onScrollEnd: (offset: number) => offset$.next(offset);
// Fetch search results
onSearch: (term) => {
  search$.next(term);
};
                To manipulate the state of a scan you can write higher order functions that get the old state and the new update. Combine then with the merge operator. This way you stick to a clean stream-oriented solution without any side-effects.
const { Subject, merge } = rxjs;
const { scan, map } = rxjs.operators;
add$ = new Subject();
clear$ = new Subject();
add = (value) => (state) => [...state, value];
clear = () => (state) => [];
const result$ = merge(
  add$.pipe(map(add)),
  clear$.pipe(map(clear))
).pipe(
  scan((state, innerFn) => innerFn(state), [])
)
result$.subscribe(result => console.log(...result))
add$.next(1)
add$.next(2)
clear$.next()
add$.next(3)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
This method can easily be extended and/or adapted for other state usecases in rxjs.
Example (remove last item)
removeLast$ = new Subject()
removeLast = () => (state) => state.slice(0, -1);
merge(
  ..
  removeLast$.pipe(map(removeLast)),
  ..
)
                        I think you could achieve what you want just by restructuring your chain (I'm omitting tap calls that trigger loading for simplicity):
search$.pipe(
  switchMap(searchterm =>
    concat(
      userService.getUsers(0, searchterm),
      offset$.pipe(concatMap(offset => userService.getUsers(offset, searchterm)))),
    ).pipe(
      map(({ data }) => data.map((user) => ({
        label: user.name,
        value: user.id
      }))),
      scan((acc, curr) => [...acc, ...curr], []),
    ),
  ),
);
Every emission from search$ will create a new inner Observable with its own scan that will start with an empty accumulator.
Found a working solution: I check the current offset by using withLatestFrom before the scan operator and reset the accumulator if needed based on this value.
Stackblitz demo
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With