import { timer } from 'rxjs/internal/observable/timer';
)OR
It's only stoping execution based on Time (using import { timer } from 'rxjs/internal/observable/timer';
)
The names of attributes, variables and their values have been changed for example purposes:
import { finalize } from 'rxjs/internal/operators/finalize';
import { interval } from 'rxjs/internal/observable/interval';
import { timer } from 'rxjs/internal/observable/timer';
import { takeUntil, first } from 'rxjs/operators';
import { merge, EMPTY, of } from 'rxjs';
.
. // Attributes and Class declaration here
.
async startProcess(): Promise<void> {
this.isProcessLoading = true;
const { someId } = await this.exampleService.execute().toPromise();
const interval$ = interval(1000);
const timeLimiter$ = timer(10000);
const request$ = this.exampleService.doRequest();
const isEventFinished$ = EMPTY;
// My goal here is for the result of this function to return an observable that notifies
// if the first parameter emits an event OR if the second parameter emits another. That is, I want to notify if any condition is valid
const stopConditions$ = merge(isEventFinished$, timeLimiter$);
const handleSuccess = (object: MyType) => {
if (object.status === 'FINALIZED') {
this.object = object;
isEventFinished$.subscribe();
}
};
const handleError = () => this.showErrorComponent = true;
interval$
.pipe(takeUntil(stopConditions$))
.pipe(finalize(() => this.isSimulationLoading = false))
.subscribe(() => request$.subscribe(handleSuccess, handleError));
}
The code "works" because the timeLimiter$
fires takeUntil
after 10s. However, I want the possibility to stop before the time limit...
I want takeUntil to be able to run from here too:
isEventFinished$.subscribe()
if the above snippet performed correctly, it should be stop the interval$, but it does not. That is my problem
I dont know if two pipes made any difference than use only one like this: .pipe(takeUntil(stopConditions$), finalize(() => this.isSimulationLoading = false))
. However, i already tried it and did not work
Already tried to replace this isEventFinished$
for: const isEventFinished$ = of(1)
and his subscription by: timeLimiter$.pipe(first()).subscribe()
.
But this does not works either. In reality, this prevents the request from being executed (I don't know why)
There is also a better way to unsubscribe from or complete Observables by using the takeUntil() operator. The takeUntil() operator emits the values emitted by the source Observable until a notifier Observable emits a value.
takeUntil subscribes and begins mirroring the source Observable. It also monitors a second Observable, notifier that you provide. If the notifier emits a value, the output Observable stops mirroring the source Observable and completes.
The takeUntil(notifier) keeps emitting the values until it is notified to stop. takeWhile(predicate) emits the value while values satisfy the predicate.
takeUntil passes values from the source observable to the observer (mirroring) until a provided observable known as the notifier emits its first value. The operator subscribes to the source observable and begins mirroring the source Observable. It also subscribes to the notifier observable.
I just tried this code with a Stackblitz and it "worked" ... but I'm not sure what, exactly, you are trying to do? I then made a few updates to better see what was going on.
See the Stackblitz here: https://stackblitz.com/edit/angular-takeuntil-deborahk
The key change:
const stopConditions$ = merge(this.isEventFinished$, timeLimiter$).pipe(
tap(s => console.log("stop", s))
);
interval$.pipe(takeUntil(stopConditions$)).subscribe({
next: handleSuccess,
error: handleError,
complete: () => {
this.isSimulationLoading = false;
console.log("isSimulationLoading", this.isSimulationLoading)
}
});
Does that help?
EDIT: I added a "Finished" button to emulate whatever action would cause the finish operation.
Define isEventFinished
as an Observable by declaring it as a Subject or BehaviorSubject (BehaviorSubject has a default value, Subject does not).
isEventFinished$ = new Subject<boolean>();
Then, whenever the finished event occurs, use the next
method to emit a value into the isEventFinished
stream.
this.isEventFinished$.next(true);
Then this code should work:
const stopConditions$ = merge(this.isEventFinished$, timeLimiter$).pipe(
tap(s => console.log("stop", s))
);
interval$.pipe(takeUntil(stopConditions$)).subscribe({
next: handleSuccess,
error: handleError,
complete: () => {
this.isSimulationLoading = false;
console.log("isSimulationLoading", this.isSimulationLoading);
}
});
See the updated blitz.
Does that work?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With