Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop subscription by using multiple conditions with takeUntil

I wanna stop a observable subscription based on two conditions:

  • Time (using import { timer } from 'rxjs/internal/observable/timer';)

OR

  • Execution status (using the returned object from request that you'll see below)

What is happenning:

It's only stoping execution based on Time (using import { timer } from 'rxjs/internal/observable/timer';)

This is my current code:

The names of attributes, variables and their values ​​have been changed for example purposes:

import { finalize } from 'rxjs/internal/operators/finalize';
import { interval } from 'rxjs/internal/observable/interval';
import { timer } from 'rxjs/internal/observable/timer';
import { takeUntil, first } from 'rxjs/operators';
import { merge, EMPTY, of } from 'rxjs';

.
. // Attributes and Class declaration here
.


async startProcess(): Promise<void> {

    this.isProcessLoading = true;

    const { someId } = await this.exampleService.execute().toPromise();

    const interval$ = interval(1000);
    const timeLimiter$ = timer(10000);

    const request$ = this.exampleService.doRequest();
    
    const isEventFinished$ = EMPTY;

    // My goal here is for the result of this function to return an observable that notifies 
    // if the first parameter emits an event OR if the second parameter emits another. That is, I want to notify if any condition is valid
    const stopConditions$ = merge(isEventFinished$, timeLimiter$);

    const handleSuccess = (object: MyType) => {

      if (object.status === 'FINALIZED') {

        this.object = object;
        isEventFinished$.subscribe();
      }
    };

    const handleError = () =>  this.showErrorComponent = true;

    interval$
    .pipe(takeUntil(stopConditions$))
    .pipe(finalize(() => this.isSimulationLoading = false))
    .subscribe(() => request$.subscribe(handleSuccess, handleError));
}

The code "works" because the timeLimiter$ fires takeUntil after 10s. However, I want the possibility to stop before the time limit...

I want takeUntil to be able to run from here too:

isEventFinished$.subscribe()

if the above snippet performed correctly, it should be stop the interval$, but it does not. That is my problem

What i already tried:

  1. I dont know if two pipes made any difference than use only one like this: .pipe(takeUntil(stopConditions$), finalize(() => this.isSimulationLoading = false)). However, i already tried it and did not work

  2. Already tried to replace this isEventFinished$ for: const isEventFinished$ = of(1) and his subscription by: timeLimiter$.pipe(first()).subscribe(). But this does not works either. In reality, this prevents the request from being executed (I don't know why)

like image 624
Joao Albuquerque Avatar asked Jul 16 '20 22:07

Joao Albuquerque


People also ask

Does takeUntil unsubscribe?

There is also a better way to unsubscribe from or complete Observables by using the takeUntil() operator. The takeUntil() operator emits the values emitted by the source Observable until a notifier Observable emits a value.

Does takeUntil subscribe?

takeUntil subscribes and begins mirroring the source Observable. It also monitors a second Observable, notifier that you provide. If the notifier emits a value, the output Observable stops mirroring the source Observable and completes.

What is the difference between takeWhile and takeUntil RXJS operator?

The takeUntil(notifier) keeps emitting the values until it is notified to stop. takeWhile(predicate) emits the value while values satisfy the predicate.

What is takeUntil operator in RXJS?

takeUntil passes values from the source observable to the observer (mirroring) until a provided observable known as the notifier emits its first value. The operator subscribes to the source observable and begins mirroring the source Observable. It also subscribes to the notifier observable.


1 Answers

I just tried this code with a Stackblitz and it "worked" ... but I'm not sure what, exactly, you are trying to do? I then made a few updates to better see what was going on.

See the Stackblitz here: https://stackblitz.com/edit/angular-takeuntil-deborahk

The key change:

const stopConditions$ = merge(this.isEventFinished$, timeLimiter$).pipe(
  tap(s => console.log("stop", s))
);

interval$.pipe(takeUntil(stopConditions$)).subscribe({
  next: handleSuccess,
  error: handleError,
  complete: () => {
    this.isSimulationLoading = false;
    console.log("isSimulationLoading", this.isSimulationLoading)
  }
});

Does that help?

EDIT: I added a "Finished" button to emulate whatever action would cause the finish operation.

Define isEventFinished as an Observable by declaring it as a Subject or BehaviorSubject (BehaviorSubject has a default value, Subject does not).

  isEventFinished$ = new Subject<boolean>();

Then, whenever the finished event occurs, use the next method to emit a value into the isEventFinished stream.

this.isEventFinished$.next(true);

Then this code should work:

const stopConditions$ = merge(this.isEventFinished$, timeLimiter$).pipe(
  tap(s => console.log("stop", s))
);

interval$.pipe(takeUntil(stopConditions$)).subscribe({
  next: handleSuccess,
  error: handleError,
  complete: () => {
    this.isSimulationLoading = false;
    console.log("isSimulationLoading", this.isSimulationLoading);
  }
});

See the updated blitz.

Does that work?

like image 146
DeborahK Avatar answered Oct 09 '22 10:10

DeborahK