Problem: I would like to create Rxjs composition ( function chain ) that would result in buffering values from one Observable until certain event occurs then emmiting all buffered values synchroniously and then buffering until next event.
I use this function to gather all http requests that have to wait until my application make a call for authorization. And then run all those requests. ( Its implemented inside Angular4 HttpClient Interceptor ), thats my usecase but I generally seek solution on how to create such rx chain.
Why Rxjs buffer is not enoyugh. From what I read and tested buffer either requires exact time frames, or in case of getting a scheduler instead of time as parameter, its resubscribes to scheduler after detecting last scheduler`s "event" propagation. And I would like it work like this: When there appears first request I start to buffer and then subscribe to scheduler, after scheduler emits, a stop buffering, reemit all buffered values, and wait until next new request is made to start buffering again and to launch scheduler again.
Right now my solution is using helper object thats either undefined or my observable, with code roughly as follows:
private observable: Observable<boolean>;
makeRequest(): Observable<boolean> {
if (this.observable !== void 0) {
return this.observable;
} else {
this.observable = this.authenticationReuqest()
.share()
.finally(() => this.observable = void 0);
return this.observable;
}
}
In that way I kind of buffer my requests, by maiking them .delay() until same multicasted observable emits, and after it emits I just clean it up (though no need for unsubscribing as its cleans up on finally so after completing or erroring).
If anyone has an idea or patter how to replace this solution with pure Rxjs I am intrested. I have some feeling that some combination of Buffer and Zip could make it happen though I cant get the exact solution.
Thanks Tomasz
I ran into exactly the same problem, I have events coming in like this
I kick off loading events early so that we don't need to wait. But the problem is, only after one point (the "rdy", stands for ready) I want to process these events. So the ready event needs to come first. I need to hold the values until ready comes and reemit them. Like this
Here's what I do, I use multicast
to share the same subscription in downstream. Create a ready observable in the selector function filtering the first ready event. Then merge the ready observable, and buffer of the multicasted events, take one, and switchMap back from array to values. Finally concat the multicasted events. Here's the code
const events = [
'ev1',
'ev2',
'ready',
'ev3',
'ev4',
'ev5'
]
Rx.Observable
.interval(500)
.take(events.length)
.map(i => events[i])
.multicast(
() => new Rx.Subject(),
events$ => {
const ready$ = events$
.filter(event => event === 'ready')
.take(1)
return Rx.Observable
.merge(
ready$,
events$
.buffer(ready$)
.take(1)
.switchMap(events => Rx.Observable.from(events))
)
.concat(events$)
}
)
Or you can find it and run it on RxViz here https://rxviz.com/v/j8nNpe8N It's not pretty, but it works. I am still looking for a better way to do it, any idea?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With