Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Conditional emission delays with rxjs

Tags:

a-thousand-words

From picture to code?

How to get the Out observable from Data and Gates?

  • Data is an observable of any kind e.g. JSON objects to be sent to a remote backend
  • Gates is a boolean observable, where the ticks correspond to true and the crosses to false. For example, Internet connectivity whereby true means the network became accessible and false reflects a disconnection.
  • Out is the resulting observable, which emits the same as Data, sometimes immediately, sometimes with a delay, depending on the gate that preceded. For instance, I could subscribe to the Out in order to post the emitted JSON objects to a remote API while connected to the Internet.
like image 795
Cel Avatar asked May 11 '17 10:05

Cel


2 Answers

Another way to conditionally delay data$ is to use delayWhen() such:

const gate$ = new BehaviorSubject<boolean>(false);
const triggerF = _ => gate$.pipe(filter(v => v));
const out$ = data$
  .pipe(delayWhen(triggerF))              
  .subscribe( (v) => console.log(v));

// then trigger gate$, for instance:
setTimeout(() => gate$.next(true), 5000);
setTimeout(() => gate$.next(false), 10000);
like image 63
Sergey Kurganov Avatar answered Oct 19 '22 23:10

Sergey Kurganov


For what I understand, you need data$ when gates$ emits true, and buffering of data$ otherwise, ending when gates$ emits true again, so sth like :

out$ = gates$.switchMap(x => x? data$ : data$.buffer(gates$))

Hypothesis : data$, gates$ are hot streams (cf. what that means here Hot and Cold observables : are there 'hot' and 'cold' operators?).

This is not tested, but try it, and let us know if it indeed worked (or prove it with code as you say :-). The logic looks ok, I am just unsure about the re-entrant gates$. Hopefully the inner gates$ suscription from buffer fires before the outer one. If that does not happen you will see a pause in the emission of data corresponding to network downtime.

Alright, if that does not work, then the standard solution with scan will. The behavior which you seek can be expressed as a (tiny) state machine, with two states : passthrough and buffering. You can implement all such state machines with scan.

Here goes scan solution : https://jsfiddle.net/1znvwyzc/

const gates$ = Rx.Observable.interval(2000)
                            .map(_ => Math.random() >= 0.5)
                            .map(x => ({gates: x}))
                            .share()

const data$ = Rx.Observable.interval(500)
                           .map(_ => "data"+ _)
                           .map(x => ({data: x}))                           
                           .share()

const out$ = Rx.Observable.merge(gates$, data$).scan((acc, val) => {
  if (acc.controlState === 'passthrough'){
    if (Object.keys(val).includes('data')) {
      return {
        controlState : 'passthrough',
        bufferedData : [],
        out : val.data
      }
    }
    if (Object.keys(val).includes('gates')) {
      if (val.gates) {
        // gates passing from true to true -> no changes to perform
        return {
        controlState : 'passthrough',
        bufferedData : [],
        out : null
        }
      } else {
        // gates passing from true to false, switch control state
        return {
        controlState : 'buffered',
        bufferedData : [],
        out : null        
        }
      }      
    }
  }
  if (acc.controlState === 'buffered'){
    if (Object.keys(val).includes('data')) {
      return {
        controlState : 'buffered',
        bufferedData : (acc.bufferedData.push(val.data), acc.bufferedData),
        out : null              
      }
    }
    if (Object.keys(val).includes('gates')) {
      if (val.gates) {
        // gates from false to true -> switch control state and pass the buffered data
        return {
          controlState : 'passthrough',
          bufferedData : [],
          out : acc.bufferedData              
        }
      } else {
        // gates from false to false -> nothing to do
        return {
          controlState : 'buffered',
          bufferedData : acc.bufferedData,
          out : null                    
        }
      }
    }
  }
}, {controlState : 'passthrough', bufferedData : [], out:null})
.filter(x => x.out)
.flatMap(x => Array.isArray(x.out) ? Rx.Observable.from(x.out) : Rx.Observable.of(x.out))

out$.subscribe(_ => console.log(_))   

You can see the exact same technique used here : How do I conditionally buffer key input based on event in RxJs

like image 30
user3743222 Avatar answered Oct 20 '22 01:10

user3743222