I have problems understand the execution model/order of RxJS Observables and Subjects. I read a lot of literature and blog posts about RxJS observables being the better promise since their subscription can be canceled and they can emit multiple results/values via next().
This question might be answered easily but how does RxJS create or simulate asynchronism? Does RxJS Observables wrap around promises and create a sequence of promises to make the code execution asynchronous? Or is it because of the implemented observable pattern that change is propagated asynchronous to subscribers but code execution is still synchronous?
In my point of view javascript code is asynchronous when it is handle via callbacks in any of the JavaScript callback queues processed by the event loop.
In RxJS, everything is about producer. The producer can be anything and it can be synchronous or asynchronous, thus Observables can both emit synchronously or asynchronously.
Lets try to understand what is (a)synchronous behavior. I will leave couple of links for deeper understanding of the subject: a talk by Philip Roberts, another talk by Jake Archibald and Jake's blog if you don't like watching long videos.
Tl;dw(atch): all JavaScript code is synchronous and executes within a single thread. On the other hand, WebAPIs, which can be accessed from JS code, may execute some other stuff in other threads and bring back the result to the JavaScript runtime. And the results are passed through to the runtime by Event loop and the callbacks. So, when you say:
In my point of view javascript code is asynchronous when it is handle via callbacks in any of the JavaScript callback queues processed by the event loop.
You're right. A callback handled by the Event loop is the asynchronous callback. Examples of WebAPIs which have asynchronous callbacks are: setTimeout
and setInterval
, DOM events, XHR events, Fetch events, Web workers, Web sockets, Promises, MutationObserver callbacks and so on. The last two (Promises and MutationObservers) schedule tasks on a different queue (microtask queue), but it's still asynchronous.
Back to RxJS. I already told that in RxJS it's everything about the producer. Observables wrap producers using observers. To quote Ben Lesh from the article:
[A producer] is anything you’re using to get values and pass them to
observer.next(value)
.
This means that the code that is synchronous (and all JS code is) will synchronously emit values when wrapped with an Observable. For example:
import { Observable } from 'rxjs';
const o = new Observable(observer => {
[1, 2, 3].forEach(i => observer.next(i));
observer.complete();
});
o.subscribe(x => console.log(x));
console.log('Anything logged after this?');
Logs:
1
2
3
Anything logged after this?
On the other hand, next example uses setTimeout
(which is not part of the ECMAScript specification and uses asynchronous callback):
import { Observable } from 'rxjs';
const o = new Observable(observer => {
setTimeout(() => {
observer.next(1);
observer.complete();
}, 0);
});
o.subscribe(x => console.log(x));
console.log('Anything logged after this?');
Logs this:
Anything logged after this?
1
This means that, even though I subscribed to the source observable before last console.log
, we've got the message before observer
sent next value. This is because of the asynchronous nature of setTimeout
.
In fact, RxJS has many ways of creating Observables so that you don't have to write your own implementations by wrapping all of this.
So, improved first example:
import { from } from 'rxjs';
from([1, 2, 3]).subscribe(i => console.log(i));
console.log('Anything logged after this?');
Or improved second example:
import { of, scheduled, asyncScheduler } from 'rxjs';
scheduled(of(1), asyncScheduler).subscribe(i => console.log(i));
console.log('Anything logged after this?');
scheduled
creation operator uses schedulers for dispatching events on different task queues. asyncScheduler
internally uses setTimeout
to dispatch the event to the macrotask queue, while asapScheduler
internally uses Promises as it uses microtask queue.
However, setTimeout
is the most obvious and the most repeated example of asynchronous behavior. XHR is the one that is much more interesting to us. Angular's HTTP client does the same wrapping as I did in my first two examples, so that, when response comes, it is transferred to the responseObserver
using next
.
When the response comes from the server, XMLHttpRequest
object puts it to macrotask queue which gets pushed to the call stack by Event loop once call stack is cleared, and the message can be passed to the responseObserver
.
This way, the asynchronous event happens, and the subscribers to the Observable that wraps that XMLHttpRequest
object get their value asynchronously.
I read a lot of literature and blog posts about RxJS observables being the better promise since their subscription can be canceled and they can emit multiple results/values via next().
The difference between Observables and Promises is indeed in the fact that Observables are cancelable. This is the most important when you're working a lot with WebAPIs as many of them need to have means to be cancelable (so that resources are not lost when we stop using them).
In fact, since RxJS has many creation operators that wrap many of the WebAPIs, they're already dealing with the cancelation stuff for you. All you have to do is to keep track of the subscriptions and to unsubscribe at the right moment. Article that might be helpful for that can be found here.
Does RxJS Observables wrap around promises and create a sequence of promises to make the code execution asynchronous?
No, they wrap a producer. Anything that can call observer.next
method. If a producer uses asynchronous callbacks which call observer.next
method, then Observables emit asynchronously. Other way around, they emit synchronously.
But, even though original emissions are synchronous, they can be dispatched to be emitted asynchronously by using schedulers.
Good rule of thumb is that in RxJS everything is synchronous unless you work with time. This default behavior has changed between RxJS 4 and RxJS 5+. So for example range()
, from()
or of()
these all are synchronous. All inner subscriptions inside switchMap
, mergeMap
, forkJoin
, etc. are synchronous. This means that you can easily make infinite loops if you emit from subscribe()
:
const subject$ = new Subject();
const stop$ = new Subject();
subject$.pipe(
tap(() => /* whatever */)
takeUntil(stop),
).subscribe(() => {
subject$.next();
stop$.next();
});
This example will never reach stop$.next()
.
A common source of confusion is using combineLatest()
with synchronous sources. For example both combineLatest()
and range()
emit synchronously. Try to guess what series of values this chain emits. We want to get all combinations from the two range
Observables:
import { combineLatest, range} from 'rxjs';
combineLatest([
range(1, 5),
range(1, 5),
]).subscribe(console.log);
Live demo: https://stackblitz.com/edit/rxjs-p863rv
This emitted only five values where the first number is always 5
which is weird at the first sight. If we want to emit all combinations we would have to chain each range()
with delay(0)
or use asyncScheduler
or use subscribeOn(asyncScheduler)
operator to force async behavior.
combineLatest([
range(1, 5, asyncScheduler),
range(1, 5, asyncScheduler),
]).subscribe(console.log);
Live demo: https://stackblitz.com/edit/rxjs-tnxonz
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With