I created an observable, which will fire 3 seconds after the last change is made, and calls the publishChange
of the service. It works, but I would like to create a doImmediateChange
function, which calls publishChange
immediately and stops the debounced observable. How is that possible?
My component:
class MyComponent {
private updateSubject = new Subject<string>();
ngOnInit() {
this.updateSubject.pipe(
debounceTime(3000),
distinctUntilChanged()
).subscribe(val => {
this.srv.publishChange(val);
});
}
doChange(val: string) {
this.updateSubject.next(val);
}
doImmediateChange(val: string) {
// Stop the current updateSubject if debounce is in progress and call publish immediately
// ??
this.srv.publishChange(val);
}
}
The debounce() function forces a function to wait a certain amount of time before running again. The function is built to limit the number of times a function is called. The Send Request() function is debounced. Requests are sent only after fixed time intervals regardless of how many times the user presses the button.
RxJS debounce() operator is a filtering operator that emits a value from the source Observable only after a while (exactly after a particular period). The emission is determined by another input given as Observable or promise.
Bouncing is the tendency of any two metal contacts in an electronic device to generate multiple signals as the contacts close or open; debouncing is any kind of hardware device or software that ensures that only a single signal will be acted upon for a single opening or closing of a contact.
Debouncing is a good method for controlling events that require sporadic user actions such as typing in an input field or clicking a button. In the case of a search bar that makes API calls according to user input, implementing a debounce is a good way to reduce the number of calls made to the API.
However, if you have been using RxJS for long enough, this is not a concept that seems natural to Observables. The pattern seems to almost dictate that Subscriptions are simply a “viewership” to the source and not something that controls the source.
Just unsubscribe: import { interval } from 'rxjs'; const subscription = interval (1000).pipe (...).subscribe ();... subscription.unsubscribe (); Note that since interval () is asynchronous you can call unsubscribe () inside the subscribe 's callback as well. Jul 2019: Updated for RxJS 6.
Unsubscribing or Cancelling. One of the key features of Observables is its ability to cancel the underlying operation of the source. In fact the ReactiveX documentation explains the primary purpose of the Subscription as this. Subscription: represents the execution of an Observable, is primarily useful for cancelling the execution.
You'll of course need to set that other observable up to emit values in a way that is useful to you, but here's an example that stops an interval after 5 seconds: Rx.Observable.interval (100) .takeUntil (Rx.Observable.timer (5000)) .subscribe (val => console.log (val));
Use the race operator:
The first observable to complete becomes the only observable subscribed to, so this recursive function will complete after one emission take(1)
, then resubscribe () => this.raceRecursive()
.
private timed$ = new Subject<string>();
private event$ = new Subject<string>();
ngOnInit() {
this.raceRecursive()
}
raceRecursive() {
race(
this.timed$.pipe(debounceTime(1000)),
this.event$
)
.pipe(take(1)) // force it to complete
.subscribe(
val => console.log(val), // srv call here
err => console.error(err),
() => this.raceRecursive() // reset it once complete
)
}
doChange(val: string) {
this.timed$.next(val)
}
doImmediateChange(val: string) {
this.event$.next(val)
}
You can emulate debounceTime
using switchMap
and delay
. Then cancel the inner Observable with takeUntil
to prevent a waiting value from being emitted.
private updateSubject = new Subject<string>();
private interrupt = new Subject();
ngOnInit() {
this.updateSubject.pipe(
switchMap(val => of(val).pipe(
delay(3000),
takeUntil(this.interrupt)
))
).subscribe(val => publish(val));
}
doChange(val: string) {
this.updateSubject.next(val);
}
doImmediateChange(val: string) {
this.interrupt.next();
publish(val);
}
https://stackblitz.com/edit/rxjs-ya93fb
You can achieve this behavior using debounce and race:
with the code you provided
private destroy$ = new Subject<void>();
private immediate$ = new Subject<void>();
private updateSubject$ = new Subject<string>();
constructor(private srv: PubSubService) {}
ngOnInit() {
this.updateSubject$.pipe(
takeUntil(this.destroy$),
debounce(() => race(timer(3000), this.immediate$))
).subscribe(val => {
this.srv.publishChange(val);
});
}
doChange(val: string, immediate?: boolean) {
this.updateSubject$.next(val);
if (immediate) this.immediate$.next();
}
// don't forget to unsubscribe
ngOnDestroy() {
this.destroy$.next();
}
emitting an immediate change will replace the previous normal change (that is debounced for 3s) without the delay (thanks to our race observable).
here's a working example
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With