Recently I've started looking at RxJS and RxJava(from Netflix) libraries which work on the concept of Reactive Programming.
Node.js works on the basis of event loops, which provides you all the arsenal for asynchronous programming and the subsequent node libraries like "cluster" help you to get best out of your multi-core machine. And Node.js also provides you the EventEmitter functionality where you can subscribe to events and act upon it asynchronously.
On the other hand if I understand correctly RxJS (and Reactive Programming in general) works on the principle of event streams, subscribing to event streams, transforming the event stream data asynchronously.
So, the question is what does using Rx packages in Node.js mean. How different is the Node's event loop, event emitter & subscriptions to the Rx's streams and subscriptions.
RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables that makes it easier to compose asynchronous or callback-based code.
Node. js is also a great framework for reactive systems because its architecture is closely aligned with the reactive principles documented in the Reactive Manifesto and The Reactive Principles.
Introducing the ReactiveX API and the RxJS library The ReactiveX API is implemented through language-specific libraries. The ReactiveX library for JavaScript is a Node Package Manager (npm) module called RxJS (ReactiveX JavaScript). ReactiveX libraries are also available for other languages.
Observables are not like EventEmitters. They may act like EventEmitters in some cases, namely when they are multicasted using RxJS Subjects, but usually they don't act like EventEmitters.
In short, an RxJS Subject is like an EventEmitter, but an RxJS Observable is a more generic interface. Observables are more similar to functions with zero arguments.
Consider the following:
function foo() { console.log('Hello'); return 42; } var x = foo.call(); // same as foo() console.log(x); var y = foo.call(); // same as foo() console.log(y);
Of course we all expect to see as output:
"Hello" 42 "Hello" 42
You can write the same behavior above, but with Observables:
var foo = Rx.Observable.create(function (observer) { console.log('Hello'); observer.next(42); }); foo.subscribe(function (x) { console.log(x); }); foo.subscribe(function (y) { console.log(y); });
And the output is the same:
"Hello" 42 "Hello" 42
That's because both functions and Observables are lazy computations. If you don't call the function, the console.log('Hello')
won't happen. Also with Observables, if you don't "call" (subscribe
), the console.log('Hello')
won't happen. Plus, "calling" or "subscribing" is an independent operation: two function calls trigger two separate side effects, and two Observable subscribes trigger two separate side effects. As opposed to EventEmitters which share the side effects and have eager execution regardless of the existence of subscribers, Observables have no shared execution and are lazy.
So far, no difference between the behavior of a function and an Observable. This StackOverflow question would have been better phrased as "RxJS Observables vs functions?".
Some people claim that Observables are asynchronous. That is not true. If you surround a function call with logs, like this:
console.log('before'); console.log(foo.call()); console.log('after');
You will obviously see the output:
"before" "Hello" 42 "after"
And this is the same behavior with Observables:
console.log('before'); foo.subscribe(function (x) { console.log(x); }); console.log('after');
And the output:
"before" "Hello" 42 "after"
Which proves the subscription of foo
was entirely synchronous, just like a function.
So what is really the difference between an Observable and a function?
Observables can "return" multiple values over time, something which functions cannot. You can't do this:
function foo() { console.log('Hello'); return 42; return 100; // dead code. will never happen }
Functions can only return one value. Observables, however, can do this:
var foo = Rx.Observable.create(function (observer) { console.log('Hello'); observer.next(42); observer.next(100); // "return" another value observer.next(200); }); console.log('before'); foo.subscribe(function (x) { console.log(x); }); console.log('after');
With synchronous output:
"before" "Hello" 42 100 200 "after"
But you can also "return" values asynchronously:
var foo = Rx.Observable.create(function (observer) { console.log('Hello'); observer.next(42); observer.next(100); observer.next(200); setTimeout(function () { observer.next(300); }, 1000); });
With output:
"before" "Hello" 42 100 200 "after" 300
To conclude,
func.call()
means "give me one value immediately (synchronously)"obsv.subscribe()
means "give me values. Maybe many of them, maybe synchronously, maybe asynchronously"That's how Observables are a generalization of functions (that have no arguments).
When does a listener gets attached to Emitter ?
With event emitters, the listeners are notified whenever an event, that they are interested in happens. When a new listener is added after the event has occurred, he will not know about the past event. Also the new listener will not know the history of events that happened before. Ofcourse, we could manually program our emitter and listener to handle this custom logic.
With reactive streams, the subscriber gets the stream of events that happened from the beginning. So the time at which he subscribes is not strict. Now he can perform variety of operations on the stream to get the sub-stream of events that he is interested in.
The advantage of this comes out:
Higher order streams:
A Higher-order stream is a "stream of streams": a stream whose event values are themselves streams.
With Event emitters, one way of doing it is by having same listener attached to multiple event emitters. It becomes complex when we need to correlate the event happened on different emitters.
With reactive streams, it's a breeze. An example from mostjs (which is a reactive programming library, like RxJS but more performant)
const firstClick = most.fromEvent('click', document).take(1); const mousemovesAfterFirstClick = firstClick.map(() => most.fromEvent('mousemove', document) .takeUntil(most.of().delay(5000)))
In the above example, we correlate the click events with mouse move events. Deducting patterns across events become so easier to accomplish when events are available as a stream.
Having said that, with EventEmitter we could accomplish all this by over engineering our emitters and listeners. It needs over engineering because it is not intended in first place for such scenarios. Whereas reactive streams does this so fluently because it is intended to solve such problems.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With