Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When using rxjs why doesn't switchMap trigger a complete event?

Recently, in my Angular app, I've started to use the rxjs switchMap operator in a couple of different scenarios. I soon realised that when using switchMap, when you subscribe to this stream, the completion block does not fire (I don't think error block does either). All the examples I've seen online don't seem to handle a completion block either, and I'm baffled as to what the reason is for this?

I'm obviously missing something in regard to switchMap or how it is used, but I don't know what.

I'd ideally like to call a function with triggers a Http request, and then deal with the error in the error block and then handle post-request stuff in the completion block.

Here's my example of what I'm doing:

export class ResultsComponent {

  ngAfterViewInit() {

    Observable.combineLatest(...filters)
        .debounceTime(500)
        .distinctUntilChanged()
        .switchMap((activeFilters: Array<ActiveFilter>) => {
            const filters = this.mapFilters(activeFilters);
            return this.doSearch(this.term$.getValue(), filters);
        })
        .subscribe((res) => {
           this.onSearchSuccess(res);
        },
        (err) => {
            // THIS NEVER FIRES
            console.error(err);
            this.loading$.next(false);
        ,() => {
            // THIS NEVER FIRES
            this.loading$.next(false);
        });
  }

  private doSearch(input: string, filters: object): Observable<object> {
    return this.searchService.search(input, filters);
  }
}

service

export class SearchService {

  private baseUrl: string = 'http://mydomainhere.com/api';

  constructor(private http: Http) {}

  public search(input: string, filters: object): Observable<object> {
    const params = {
      "keyword": input,
      "filters": filters
    };
    const url = `${this.baseUrl}/search`;
    return this.http.post(url, params)
       .map(res => res.json())
       .catch(this.handleError);
  }
}
like image 341
Rich Avatar asked Oct 31 '17 09:10

Rich


People also ask

How to avoid switchmap-related bugs in RxJS?

From the summarised recommendations in RxJS: Avoiding switchMap-related Bugs, we know that: mergeMap should not be used — the ordering of the results is important; switchMap could be used — when a new search is made, pending results are no longer needed; and exhaustMap should not be used — searches for new, partial addresses should not be ignored.

Is switchmap optimal for RxJS?

In a response to RxJS: Avoiding switchMap-related Bugs, Martin Hochel mentioned a classic use case for switchMap. For the use case to which he referred, switchMap is not only valid; it’s optimal. And it’s worth looking at why.

When should mergemap not be used?

mergeMap should not be used — the ordering of the results is important; switchMap could be used — when a new search is made, pending results are no longer needed; and exhaustMap should not be used — searches for new, partial addresses should not be ignored.

How many subscriptions does switchmap have?

Remember, switchMap maintains only one inner subscription at a time, this can be seen clearly in the first example. Be careful though, you probably want to avoid switchMap in scenarios where every request needs to complete, think writes to a database. switchMap could cancel a request if the source emits quickly enough.


1 Answers

Completing the Outer Observable with the Inner

There are a number of ways to cause the outer observable to complete with the inner. (The next section explains why you might not want to do this, followed by an example of detecting inner observable completion when the outer observable doesn't complete.)

If you know your inner observable will only emit one value before completing, like with an API call, you can just pipe first onto your outer observable.

const { of , pipe } = rxjs;
const { switchMap, first } = rxjs.operators;

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4)),
    first()
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

But if the inner observable emits multiple values a simple change would be to use endWith and takeWhile to tell the outer observable when to complete. This assumes we know the inner observable will never emit null.

const { of , pipe } = rxjs;
const { switchMap, endWith, takeWhile } = rxjs.operators;

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4, 5, 6).pipe(
      endWith(null)
    )),
    takeWhile((x) => x != null)
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

A general solution is to have a Subject emit when the inner observable completes, and have the outer observable complete when the Subject emits, watching for it with takeUntil.

const { of , pipe, Subject } = rxjs;
const { switchMap, tap, takeUntil } = rxjs.operators;

const innerComplete = new Subject();

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4, 5, 6).pipe(
      tap({
        complete: () => innerComplete.next()
      })
    )),
    takeUntil(innerComplete)
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

Why Doesn't It Complete?

When I first started working with RxJS I was mainly converting existing API calls to be handled with observables. In practice this meant that an outer observable would complete when the inner observable completed. But it is important to note the outer observable was not caused to complete because the inner observable did. It completed because it would only emit one value. If it was an observable that could emit multiple values, like from mouse click events, it would not complete with the inner observable.

This is a good thing. It allows you to have an outer observable that maps its emissions through an inner observable, without it completing the first time the inner observable does. For example, lets say you wanted to trigger an animation on each mouse click and the animation is controlled by a timer. The mouse clicks would be emitted by the outer observable. And the inner observable would run a timer for a few seconds to control the animation. After the animation completes, you'd still like mouse click events to be captured so the animation can start up again.

The following snippet will log a series of numbers to the console (our makeshift animation) on each click. And since we're using switchMap the previous "animation" will stop if you click in the middle of it (The concatMap piece just adds a delay between each emission). You can see this visually in the marble diagram for switchMap at https://rxmarbles.com/#switchMap

const { of , pipe, fromEvent, Subject } = rxjs;
const { switchMap, concatMap, delay } = rxjs.operators;

const innerComplete = new Subject();

const stream = fromEvent(document, 'click').pipe(
    switchMap(() => of(1, 2, 3).pipe(
      concatMap(x => of(x).pipe(delay(500)))
    ))
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

<p>Click here and watch the console.</p>

Acting on Inner Observable Completion

Given that it makes sense that the outer observable doesn't need to complete when the inner observable does you may want a way to do something when the inner observable completes without having to complete the outer observable. tap will let you do that when you pass an Observer as an argument.

const { of , pipe } = rxjs;
const { switchMap, tap } = rxjs.operators;

const stream = of (1, 2, 3).pipe(
    switchMap(() => of (4, 5, 6).pipe(tap({
      complete: () => console.log("Inner observable completed")
    }))))
    .subscribe({
      next: (x) => console.log(x),
      complete: () => console.log('Outer observable completed')
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
like image 99
Rob Mosher Avatar answered Oct 09 '22 20:10

Rob Mosher