Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJS Observable stretch

I have a Rx.Observable.webSocket Subject. My server endpoint can not handle messages receiving the same time (<25ms). Now I need a way to stretch the next() calls of my websocket subject.

I have created another Subject requestSubject and subscribe to this. Then calling next of the websocket inside the subscription.

requestSubject.delay(1000).subscribe((request) => {
  console.log(`SENDING: ${JSON.stringify(request)}`);
  socketServer.next(JSON.stringify(request));
});

Using delay shifts each next call the same delay time, then all next calls emit the same time later ... thats not what I want.

I tried delay, throttle, debounce but it does not fit.

The following should illustrate my problem

Stream 1 | ---1-------2-3-4-5---------6----

    after some operation ...

Stream 2 | ---1-------2----3----4----5----6-
like image 360
Pascal Avatar asked Oct 17 '22 09:10

Pascal


1 Answers

Had to tinker a bit, its not as easy as it looks:

//example source stream
const source = Rx.Observable.from([100,500,1500,1501,1502,1503])
  .mergeMap(i => Rx.Observable.of(i).delay(i))
  .share();

stretchEmissions(source, 1000)
  .subscribe(val => console.log(val));

function stretchEmissions(source, spacingDelayMs) {
  return source
    .timestamp()
    .scan((acc, curr) => {
      // calculate delay needed to offset next emission
      let delay = 0;
      if (acc !== null) {
        const timeDelta = curr.timestamp - acc.timestamp;
        delay = timeDelta > spacingDelayMs ? 0 : (spacingDelayMs - timeDelta);
      }
  
      return {
        timestamp: curr.timestamp,
        delay: delay,
        value: curr.value
      };
    }, null)
    .mergeMap(i => Rx.Observable.of(i.value).delay(i.delay), undefined, 1);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

Basically we need to calculate the needed delay between emissions so we can space them. We do this using timestamp() of original emissions and the mergeMap overload with a concurrency of 1 to only subscribe to the next delayed value when the previous is emitted. This is a pure Rx solution without further side effects.

like image 147
Mark van Straten Avatar answered Oct 21 '22 00:10

Mark van Straten