I'm trying to understand how the stream is transmitted through the pipe in RXjs.
I know that this should not be a concern because that's the whole idea with async streams - but still there's something I want to understand.
Looking at this code :
var source = Rx.Observable
.range(1, 3)
.flatMapLatest(function (x) { //`switch` these days...
return Rx.Observable.range(x*100, 2);
});
source.subscribe(value => console.log('I got a value ', value))
Result :
I got a value 100
I got a value 200
I got a value 300
I got a value 301
I believe (IIUC) that the diagram is something like this : (notice striked 101,201 which are unsubscribed)
----1---------2------------------3------------------------------|
░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------
And here is the question:
Question:
Is it always guaranteed that 2 will arrive before the (101) ? same as that 3 is arriving before (201) ?
I mean - if I'm not suppose to look at a time line so it is perfectly legal for the following diagram to occur :
----1---------------2---------------3------------------------------|
░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------101------200---201-----300------301-------------
Where 2
arrived with a slight delay where 101 was already emitted
What am I missing here? How does the pipe work here ?
For this particular Observable chain with this particular RxJS version the order of emissions is going to always be the same.
As already mentioned, in RxJS 4 it uses the currentThread
scheduler as you can see here: https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39.
All schedulers (except the immediate
from RxJS 4) are internally using some type of a queue so the order is always the same.
The order of events is very similar to what you showed in the diagram (... or at least I think it is):
1
is scheduled and emitted because it's the only action in the queue.100
is scheduled. At this point there are no more action in the Scheduler's queue because 2
hasn't been scheduled yet. The RangeObservable
schedules another emission recursively after it calls onNext()
. This means that 100
is scheduled before 2
.2
is scheduled.100
is emitted, 101
is scheduled2
is emitted, 101
is disposed.Note that this behavior is different in RxJS 4 and RxJS 5.
In RxJS 5 most Observables and operators by default don't use any Scheduler (an obvious exception are Observables/operator that need to work with delays). So in RxJS 5 the RangeObservable
won't schedule anything and start emitting values right away in a loop.
The same example in RxJS 5 will produce different result:
const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2);
});
source.subscribe(value => console.log('I got a value ', value));
This will print the following:
I got a value 100
I got a value 101
I got a value 200
I got a value 201
I got a value 300
I got a value 301
However, this will change significantly if you add for example delay(0)
. The common sense suggests that this shouldn't do anything:
const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2).delay(0);
});
source.subscribe(value => console.log('I got a value ', value));
Now only the inner RangeObservable
is scheduled and disposed all over again several times which makes it emit only values from the very the last RangeObservable
:
I got a value 300
I got a value 301
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With