Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJS: How can I generate a stream of numbers at random intervals (within a specified range)?

I want to use RXJS to set up an ORDERED data stream that emits a number at a random interval (say every 1-5 seconds) which I want to use as a time-randomized data source for testing other parts of RXJS. The following code is generating the items in a random order (due to the delay) but I would like the order preserved only the time randomized.

function randomDelay(bottom, top) {
  return Math.floor( Math.random() * ( 1 + top - bottom ) ) + bottom;
}

var source = Rx.Observable
  .range(1, 10)
  .flatMap(function (x) {
    return Rx.Observable
      .of(x)
      .delay(randomDelay(1000,5000));
  })
 .timeInterval();

var subscription = source.subscribe(
  function (x) {
     $("#result").append('Next: ' +  JSON.stringify(x) +  '<br>');
  },
  function (err) {
     $("#result").append('Error: ' + err);
  },
  function () {
     $("#result").append('Completed');
  });

is giving me variants of the following output:

Next: {"value":1,"interval":1229}
Next: {"value":2,"interval":321}
Next: {"value":4,"interval":645}
Next: {"value":5,"interval":28}
Next: {"value":9,"interval":728}
Next: {"value":10,"interval":269}
Next: {"value":3,"interval":107}
Next: {"value":6,"interval":265}
Next: {"value":8,"interval":1038}
Next: {"value":7,"interval":199}
like image 271
Laurence Fass Avatar asked Jan 28 '16 11:01

Laurence Fass


2 Answers

Use concatMap instead of flatMap.

Documentation here: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/concatmap.md

var source = Rx.Observable
  .range(1, 10)
  .concatMap(function (x) {
    return Rx.Observable
      .of(x)
      .delay(randomDelay(1000,5000));
  })
 .timeInterval();
like image 193
user3743222 Avatar answered Sep 21 '22 14:09

user3743222


I just used this question as my base to another and had to update it to RxJs 6 if anyone is interested.

const { range, of } = rxjs;
const { concatMap, delay } = rxjs.operators;

range(1, 10).pipe(
  concatMap(i => of(i).pipe(delay(1000 + (Math.random() * 4000))))
).subscribe(val => { console.log(val); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
like image 43
Adrian Brand Avatar answered Sep 18 '22 14:09

Adrian Brand