Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CombineLatest of dynamic array inside switchMap is unsubscribing and re-subscribing continuously

Tags:

rxjs

I have at least two buttons that I want to dynamically listen for clicks on. listeningArray$ will emit an array (ar) of button #'s that I need to be listening to. When somebody clicks on one of these buttons I'm listening to, I need to console log that the button that was clicked and also log the value from a time interval.

If ar goes from [1,2] to [1], we need to stop listening to clicks on button #2. So the DOM click event needs to be removed for 2 and that should trigger the .finally() operator. But for 1, we should remain subscribed and the code inside the .finally() should not run, since nothing is being unsubscribed.

const obj$ = {};

Rx.Observable.combineLatest(
  Rx.Observable.interval(2000),
  listeningArray$ // Will randomly emit either [1] or [1,2]
)
.switchMap(([x, ar]) => {
    const observables = [];

    ar.forEach(n => {
        let nEl = document.getElementById('el'+n);

        obj$[n] = obj$[n] || Rx.Observable.fromEvent(nEl, 'click')
          .map(()=>{
            console.log(' el' + n);
          })
          .finally(() => {
            console.log(' FINALLY_' + n);
          });

        observables.push(obj$[n]);
    })

    return Rx.Observable.combineLatest(...observables);
})
.subscribe()

But what's happening is every time the interval emits a value, the DOM events ALL get removed and then immediately get added on again, and the code inside the .finally operator runs for 1 and 2.

This is really frustrating me. What am I missing?

It's a bit of a complex situation, so I created this: https://jsfiddle.net/mfp22/xtca98vx/7/

like image 456
Michael Pearson Avatar asked Sep 13 '25 07:09

Michael Pearson


2 Answers

I was actually really close, but I misunderstood the point of switchMap.

switchMap is designed to unsubscribe from the observable it returns whenever a new value is emitted from above. This is why it can be used to cancel old pending Http requests when a new request needs to be made instead.

The problem I was having is to be expected. switchMap will unsubscribe from the previously returned observable before subscribing to the current one. This was unacceptable, as I explained in the question. The reason this was unacceptable was that in my actual project, the fromEvent observables were listening to Firebase child_added events, so when these cold observables went from having no subscribers to having 1 subscriber, Firebase would subsequently fire the event for every child already existing, as well as for future ones added.

I played with mergeMap for a while, but it was really difficult and buggy to manually have to unsubscribe from previously returned observables.

So I added a subscriber for the inner observables while switchMap was doing its process of unsubscribe from old => subscribe to new so that there would always be a subscriber. I used takeUntil(Observable.timer(0)) to make sure the subscribers didn't build up and cause a memory leak.

There may be a better solution, but this was the best one I found.

const obj$ = {};

Rx.Observable.combineLatest(
  Rx.Observable.interval(2000),
  listeningArray$ // Will randomly emit either [1] or [1,2]
)
.switchMap(([x, ar]) => {
  const observables = [];

  ar.forEach(n => {
    let nEl = document.getElementById('el'+n);

    obj$[n] = obj$[n] || Rx.Observable.fromEvent(nEl, 'click')
      .map(()=>{
        console.log(' el' + n);
      })
      .finally(() => {
        console.log(' FINALLY_' + n);
      })
      .share();
    
    obj$[n].takeUntil(Rx.Observable.timer(0))
      .subscribe();
    observables.push(obj$[n]);
  })

  return Rx.Observable.combineLatest(...observables);
})
.subscribe()

I also had to add the .share() method. I was going to need it anyway. I'm using this pattern to let some Angular components declare what data they need, ignoring what other components might want, to achieve a better separation of concerns. So multiple components can subscribe to the same Firebase observables, but the .share() operator ensures that each message from Firebase is only handled once (I'm dispatching actions to a Redux store for each one).

Working solution: https://jsfiddle.net/mfp22/xtca98vx/8/

like image 167
Michael Pearson Avatar answered Sep 15 '25 04:09

Michael Pearson


State in FRP is immutable. Thus when you switchMap to the second emission the previous observable combineLatest containing [1,2] will get unsubscribed and the finally operator invoked. Before subscribing to the next containing only [1]

If you only want to unsubscribe from one button you can store state in the DOM (add atr to button) and use filter to ignore button. Or you can add a TakeWhile() to every button dictating when it should be unsubscribed so it can invoke it's own finally()

like image 29
Mark van Straten Avatar answered Sep 15 '25 05:09

Mark van Straten



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!