Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How are asynchronous streams transmitted in RXJS?

I'm trying to understand how the stream is transmitted through the pipe in RXjs.
I know that this should not be a concern because that's the whole idea with async streams - but still there's something I want to understand.

Looking at this code :

var source = Rx.Observable
    .range(1, 3)
    .flatMapLatest(function (x) {  //`switch` these days...
        return Rx.Observable.range(x*100, 2);
    });


 source.subscribe(value => console.log('I got a value ', value))

Result :

I got a value 100
I got a value 200
I got a value 300
I got a value 301

I believe (IIUC) that the diagram is something like this : (notice striked 101,201 which are unsubscribed)

----1---------2------------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------

And here is the question:

Question:

Is it always guaranteed that 2 will arrive before the (101) ? same as that 3 is arriving before (201) ?

I mean - if I'm not suppose to look at a time line so it is perfectly legal for the following diagram to occur :

----1---------------2---------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------101------200---201-----300------301-------------

Where 2 arrived with a slight delay where 101 was already emitted

What am I missing here? How does the pipe work here ?

like image 770
Royi Namir Avatar asked Oct 30 '22 09:10

Royi Namir


1 Answers

For this particular Observable chain with this particular RxJS version the order of emissions is going to always be the same.

As already mentioned, in RxJS 4 it uses the currentThread scheduler as you can see here: https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39.
All schedulers (except the immediate from RxJS 4) are internally using some type of a queue so the order is always the same.

The order of events is very similar to what you showed in the diagram (... or at least I think it is):

  1. 1 is scheduled and emitted because it's the only action in the queue.
  2. 100 is scheduled. At this point there are no more action in the Scheduler's queue because 2 hasn't been scheduled yet. The RangeObservable schedules another emission recursively after it calls onNext(). This means that 100 is scheduled before 2.
  3. 2 is scheduled.
  4. 100 is emitted, 101 is scheduled
  5. 2 is emitted, 101 is disposed.
  6. ... and so on

Note that this behavior is different in RxJS 4 and RxJS 5.

In RxJS 5 most Observables and operators by default don't use any Scheduler (an obvious exception are Observables/operator that need to work with delays). So in RxJS 5 the RangeObservable won't schedule anything and start emitting values right away in a loop.

The same example in RxJS 5 will produce different result:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2);
  });

source.subscribe(value => console.log('I got a value ', value));

This will print the following:

I got a value  100
I got a value  101
I got a value  200
I got a value  201
I got a value  300
I got a value  301

However, this will change significantly if you add for example delay(0). The common sense suggests that this shouldn't do anything:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2).delay(0);
  });

source.subscribe(value => console.log('I got a value ', value));

Now only the inner RangeObservable is scheduled and disposed all over again several times which makes it emit only values from the very the last RangeObservable:

I got a value  300
I got a value  301
like image 50
martin Avatar answered Nov 14 '22 01:11

martin