Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How is it possible to stop a debounced Rxjs Observable?

I created an observable, which will fire 3 seconds after the last change is made, and calls the publishChange of the service. It works, but I would like to create a doImmediateChange function, which calls publishChange immediately and stops the debounced observable. How is that possible?

My component:

class MyComponent {
    private updateSubject = new Subject<string>();

    ngOnInit() {
        this.updateSubject.pipe(
            debounceTime(3000),
            distinctUntilChanged()
        ).subscribe(val => {
            this.srv.publishChange(val);
        });
    }

    doChange(val: string) {
        this.updateSubject.next(val);
    }

    doImmediateChange(val: string) {

        // Stop the current updateSubject if debounce is in progress and call publish immediately
        // ??
        this.srv.publishChange(val);

    }

}
like image 964
Iter Ator Avatar asked Nov 21 '19 11:11

Iter Ator


People also ask

How does debounce function work?

The debounce() function forces a function to wait a certain amount of time before running again. The function is built to limit the number of times a function is called. The Send Request() function is debounced. Requests are sent only after fixed time intervals regardless of how many times the user presses the button.

What is debounce in RxJS?

RxJS debounce() operator is a filtering operator that emits a value from the source Observable only after a while (exactly after a particular period). The emission is determined by another input given as Observable or promise.

What is debounce?

Bouncing is the tendency of any two metal contacts in an electronic device to generate multiple signals as the contacts close or open; debouncing is any kind of hardware device or software that ensures that only a single signal will be acted upon for a single opening or closing of a contact.

What is debounce click?

Debouncing is a good method for controlling events that require sporadic user actions such as typing in an input field or clicking a button. In the case of a search bar that makes API calls according to user input, implementing a debounce is a good way to reduce the number of calls made to the API.

Should observables have subscriptions in RxJS?

However, if you have been using RxJS for long enough, this is not a concept that seems natural to Observables. The pattern seems to almost dictate that Subscriptions are simply a “viewership” to the source and not something that controls the source.

How do I unsubscribe from an interval in RxJS?

Just unsubscribe: import { interval } from 'rxjs'; const subscription = interval (1000).pipe (...).subscribe ();... subscription.unsubscribe (); Note that since interval () is asynchronous you can call unsubscribe () inside the subscribe 's callback as well. Jul 2019: Updated for RxJS 6.

What is unsubscribing In ReactiveX observables?

Unsubscribing or Cancelling. One of the key features of Observables is its ability to cancel the underlying operation of the source. In fact the ReactiveX documentation explains the primary purpose of the Subscription as this. Subscription: represents the execution of an Observable, is primarily useful for cancelling the execution.

How to stop an observable interval after 5 seconds?

You'll of course need to set that other observable up to emit values in a way that is useful to you, but here's an example that stops an interval after 5 seconds: Rx.Observable.interval (100) .takeUntil (Rx.Observable.timer (5000)) .subscribe (val => console.log (val));


3 Answers

Use the race operator:

The first observable to complete becomes the only observable subscribed to, so this recursive function will complete after one emission take(1), then resubscribe () => this.raceRecursive().

private timed$ = new Subject<string>();
private event$ = new Subject<string>();

ngOnInit() {
  this.raceRecursive()
}

raceRecursive() {
  race(
    this.timed$.pipe(debounceTime(1000)),
    this.event$
  )
    .pipe(take(1)) // force it to complete
    .subscribe(
      val => console.log(val), // srv call here
      err => console.error(err),
      () => this.raceRecursive() // reset it once complete
    )
}

doChange(val: string) {
  this.timed$.next(val)
}

doImmediateChange(val: string) {
  this.event$.next(val)
}
like image 144
Richard Dunn Avatar answered Sep 27 '22 18:09

Richard Dunn


You can emulate debounceTime using switchMap and delay. Then cancel the inner Observable with takeUntil to prevent a waiting value from being emitted.

private updateSubject = new Subject<string>();
private interrupt = new Subject();

ngOnInit() {
  this.updateSubject.pipe(
    switchMap(val => of(val).pipe(
      delay(3000),
      takeUntil(this.interrupt)
    ))
  ).subscribe(val => publish(val));
}

doChange(val: string) {
  this.updateSubject.next(val);
}

doImmediateChange(val: string) {
  this.interrupt.next();
  publish(val);
}

https://stackblitz.com/edit/rxjs-ya93fb

like image 33
frido Avatar answered Sep 27 '22 16:09

frido


You can achieve this behavior using debounce and race:
with the code you provided

private destroy$ = new Subject<void>();
private immediate$ = new Subject<void>();
private updateSubject$ = new Subject<string>();

constructor(private srv: PubSubService) {}

ngOnInit() {
  this.updateSubject$.pipe(
      takeUntil(this.destroy$),
      debounce(() => race(timer(3000), this.immediate$))
  ).subscribe(val => {
      this.srv.publishChange(val);
  });
}

doChange(val: string, immediate?: boolean) {
  this.updateSubject$.next(val);
  if (immediate) this.immediate$.next();
}

// don't forget to unsubscribe
ngOnDestroy() {
  this.destroy$.next();
}

emitting an immediate change will replace the previous normal change (that is debounced for 3s) without the delay (thanks to our race observable).

here's a working example

like image 25
Ron Avatar answered Sep 27 '22 17:09

Ron