I'm trying to understand how the stream is transmitted through the pipe in RXjs.
I know that this should not be a concern because that's the whole idea with async streams - but still there's something I want to understand.
Looking at this code :
var source = Rx.Observable
.range(1, 3)
.flatMapLatest(function (x) { //`switch` these days...
return Rx.Observable.range(x*100, 2);
});
source.subscribe(value => console.log('I got a value ', value))
Result :
I got a value 100
I got a value 200
I got a value 300
I got a value 301
I believe (IIUC) that the diagram is something like this : (notice striked 101,201 which are unsubscribed)
----1---------2------------------3------------------------------|
░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------
And here is the question:
Question:
Is it always guaranteed that 2 will arrive before the (101) ? same as that 3 is arriving before (201) ?
I mean - if I'm not suppose to look at a time line so it is perfectly legal for the following diagram to occur :
----1---------------2---------------3------------------------------|
░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------101------200---201-----300------301-------------
Where 2 arrived with a slight delay where 101 was already emitted
What am I missing here? How does the pipe work here ?
For this particular Observable chain with this particular RxJS version the order of emissions is going to always be the same.
As already mentioned, in RxJS 4 it uses the currentThread scheduler as you can see here: https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39.
All schedulers (except the immediate from RxJS 4) are internally using some type of a queue so the order is always the same.
The order of events is very similar to what you showed in the diagram (... or at least I think it is):
1 is scheduled and emitted because it's the only action in the queue.100 is scheduled. At this point there are no more action in the Scheduler's queue because 2 hasn't been scheduled yet. The RangeObservable schedules another emission recursively after it calls onNext(). This means that 100 is scheduled before 2.2 is scheduled.100 is emitted, 101 is scheduled2 is emitted, 101 is disposed.Note that this behavior is different in RxJS 4 and RxJS 5.
In RxJS 5 most Observables and operators by default don't use any Scheduler (an obvious exception are Observables/operator that need to work with delays). So in RxJS 5 the RangeObservable won't schedule anything and start emitting values right away in a loop.
The same example in RxJS 5 will produce different result:
const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2);
});
source.subscribe(value => console.log('I got a value ', value));
This will print the following:
I got a value 100
I got a value 101
I got a value 200
I got a value 201
I got a value 300
I got a value 301
However, this will change significantly if you add for example delay(0). The common sense suggests that this shouldn't do anything:
const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2).delay(0);
});
source.subscribe(value => console.log('I got a value ', value));
Now only the inner RangeObservable is scheduled and disposed all over again several times which makes it emit only values from the very the last RangeObservable:
I got a value 300
I got a value 301
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With