Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: making a buffer emit after certain event

Problem: I would like to create Rxjs composition ( function chain ) that would result in buffering values from one Observable until certain event occurs then emmiting all buffered values synchroniously and then buffering until next event.

I use this function to gather all http requests that have to wait until my application make a call for authorization. And then run all those requests. ( Its implemented inside Angular4 HttpClient Interceptor ), thats my usecase but I generally seek solution on how to create such rx chain.

Why Rxjs buffer is not enoyugh. From what I read and tested buffer either requires exact time frames, or in case of getting a scheduler instead of time as parameter, its resubscribes to scheduler after detecting last scheduler`s "event" propagation. And I would like it work like this: When there appears first request I start to buffer and then subscribe to scheduler, after scheduler emits, a stop buffering, reemit all buffered values, and wait until next new request is made to start buffering again and to launch scheduler again.

Right now my solution is using helper object thats either undefined or my observable, with code roughly as follows:

private observable: Observable<boolean>;

makeRequest(): Observable<boolean> {
    if (this.observable !== void 0) {
        return this.observable;
    } else {
        this.observable = this.authenticationReuqest()
            .share()
            .finally(() => this.observable = void 0);

        return this.observable;
    }
}

In that way I kind of buffer my requests, by maiking them .delay() until same multicasted observable emits, and after it emits I just clean it up (though no need for unsubscribing as its cleans up on finally so after completing or erroring).

If anyone has an idea or patter how to replace this solution with pure Rxjs I am intrested. I have some feeling that some combination of Buffer and Zip could make it happen though I cant get the exact solution.

Thanks Tomasz

like image 497
Tomasz Stelągowski Avatar asked Aug 09 '17 13:08

Tomasz Stelągowski


1 Answers

I ran into exactly the same problem, I have events coming in like this

enter image description here

I kick off loading events early so that we don't need to wait. But the problem is, only after one point (the "rdy", stands for ready) I want to process these events. So the ready event needs to come first. I need to hold the values until ready comes and reemit them. Like this

enter image description here

Here's what I do, I use multicast to share the same subscription in downstream. Create a ready observable in the selector function filtering the first ready event. Then merge the ready observable, and buffer of the multicasted events, take one, and switchMap back from array to values. Finally concat the multicasted events. Here's the code

const events = [
  'ev1',
  'ev2',
  'ready',
  'ev3',
  'ev4',
  'ev5'
]
Rx.Observable
  .interval(500)
  .take(events.length)
  .map(i => events[i])
  .multicast(
      () => new Rx.Subject(),
      events$ => {
          const ready$ = events$
            .filter(event => event === 'ready')
            .take(1)
          return Rx.Observable
            .merge(
                ready$,
                events$
                  .buffer(ready$)
                  .take(1)
                  .switchMap(events => Rx.Observable.from(events))
            )
                .concat(events$)
      }
  )

Or you can find it and run it on RxViz here https://rxviz.com/v/j8nNpe8N It's not pretty, but it works. I am still looking for a better way to do it, any idea?

like image 59
Fang-Pen Lin Avatar answered Nov 08 '22 13:11

Fang-Pen Lin