How to get the Out observable from Data and Gates?
Another way to conditionally delay data$ is to use delayWhen() such:
const gate$ = new BehaviorSubject<boolean>(false);
const triggerF = _ => gate$.pipe(filter(v => v));
const out$ = data$
.pipe(delayWhen(triggerF))
.subscribe( (v) => console.log(v));
// then trigger gate$, for instance:
setTimeout(() => gate$.next(true), 5000);
setTimeout(() => gate$.next(false), 10000);
For what I understand, you need data$
when gates$
emits true, and buffering of data$
otherwise, ending when gates$
emits true again, so sth like :
out$ = gates$.switchMap(x => x? data$ : data$.buffer(gates$))
Hypothesis : data$
, gates$
are hot streams (cf. what that means here Hot and Cold observables : are there 'hot' and 'cold' operators?).
This is not tested, but try it, and let us know if it indeed worked (or prove it with code as you say :-). The logic looks ok, I am just unsure about the re-entrant gates$
. Hopefully the inner gates$ suscription from buffer
fires before the outer one. If that does not happen you will see a pause in the emission of data corresponding to network downtime.
Alright, if that does not work, then the standard solution with scan
will. The behavior which you seek can be expressed as a (tiny) state machine, with two states : passthrough
and buffering
. You can implement all such state machines with scan
.
Here goes scan
solution : https://jsfiddle.net/1znvwyzc/
const gates$ = Rx.Observable.interval(2000)
.map(_ => Math.random() >= 0.5)
.map(x => ({gates: x}))
.share()
const data$ = Rx.Observable.interval(500)
.map(_ => "data"+ _)
.map(x => ({data: x}))
.share()
const out$ = Rx.Observable.merge(gates$, data$).scan((acc, val) => {
if (acc.controlState === 'passthrough'){
if (Object.keys(val).includes('data')) {
return {
controlState : 'passthrough',
bufferedData : [],
out : val.data
}
}
if (Object.keys(val).includes('gates')) {
if (val.gates) {
// gates passing from true to true -> no changes to perform
return {
controlState : 'passthrough',
bufferedData : [],
out : null
}
} else {
// gates passing from true to false, switch control state
return {
controlState : 'buffered',
bufferedData : [],
out : null
}
}
}
}
if (acc.controlState === 'buffered'){
if (Object.keys(val).includes('data')) {
return {
controlState : 'buffered',
bufferedData : (acc.bufferedData.push(val.data), acc.bufferedData),
out : null
}
}
if (Object.keys(val).includes('gates')) {
if (val.gates) {
// gates from false to true -> switch control state and pass the buffered data
return {
controlState : 'passthrough',
bufferedData : [],
out : acc.bufferedData
}
} else {
// gates from false to false -> nothing to do
return {
controlState : 'buffered',
bufferedData : acc.bufferedData,
out : null
}
}
}
}
}, {controlState : 'passthrough', bufferedData : [], out:null})
.filter(x => x.out)
.flatMap(x => Array.isArray(x.out) ? Rx.Observable.from(x.out) : Rx.Observable.of(x.out))
out$.subscribe(_ => console.log(_))
You can see the exact same technique used here : How do I conditionally buffer key input based on event in RxJs
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With