Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Node.js Streams vs. Observables

After learning about Observables, I find them quite similar to Node.js streams. Both have a mechanism of notifying the consumer whenever new data arrives, an error occurs or there is no more data (EOF).

I would love to learn about the conceptual/functional differences between the two. Thanks!

like image 362
urish Avatar asked May 24 '15 12:05

urish


People also ask

Why we use streams in node JS?

Streams are one of the fundamental concepts of Node. js. Streams are a type of data-handling methods and are used to read or write input into output sequentially. Streams are used to handle reading/writing files or exchanging information in an efficient way.

What is streams in node JS?

What are Streams? Streams are objects that let you read data from a source or write data to a destination in continuous fashion. In Node.js, there are four types of streams − Readable − Stream which is used for read operation. Writable − Stream which is used for write operation.

Can we use observable in node JS?

It's also possible to apply transformations to an observable before the observable emits it. The code in operator. js would look like the following if written that way: import {Observable} from 'rxjs'; import {map} from 'rxjs/operators'; const clock$ = Observable.

What is a stream RXJS?

An observable represents a stream, or source of data that can arrive over time. You can create an observable from nearly anything, but the most common use case in RxJS is from events. This can be anything from mouse moves, button clicks, input into a text field, or even route changes.

What is the difference between observables and streams in JavaScript?

Both Observablesand node.js's Streamsallow you to solve the same underlying problem: asynchronously process a sequence of values. The main difference between the two, I believe, is related to the context that motivated its appearance. That context is reflected in the terminology and API.

What are streams in Node JS?

Streams are not a concept unique to Node.js. They were introduced in the Unix operating system decades ago, and programs can interact with each other passing streams through the pipe operator ( | ).

What is an observable in Java?

An observable is an instance of the Observable type. It enables support for the publishing of messages (events) and subscription to those messages within an application. In other words, an observable models a stream of events.

What is streamside interface in Node JS?

On node.js and Streamsside you wanted to create an interface for the asynchronous and performant processing of network streams and local files. The terminology derives from that initial context and you get pipe, chunk, encoding, flush, Duplex, Buffer, etc.


1 Answers

Both Observables and node.js's Streams allow you to solve the same underlying problem: asynchronously process a sequence of values. The main difference between the two, I believe, is related to the context that motivated its appearance. That context is reflected in the terminology and API.

On the Observables side you have an extension to EcmaScript that introduces the reactive programming model. It tries to fill the gap between value generation and asynchronicity with the minimalist and composable concepts of Observer and Observable.

On node.js and Streams side you wanted to create an interface for the asynchronous and performant processing of network streams and local files. The terminology derives from that initial context and you get pipe, chunk, encoding, flush, Duplex, Buffer, etc. By having a pragmatic approach that provides explicit support for particular use cases you lose some ability to compose things because it's not as uniform. For example, you use push on a Readable stream and write on a Writable although, conceptually, you are doing the same thing: publishing a value.

So, in practice, if you look at the concepts, and if you use the option { objectMode: true }, you can match Observable with the Readable stream and Observer with the Writable stream. You can even create some simple adapters between the two models.

var Readable = require('stream').Readable; var Writable = require('stream').Writable; var util = require('util');  var Observable = function(subscriber) {     this.subscribe = subscriber; }  var Subscription = function(unsubscribe) {     this.unsubscribe = unsubscribe; }  Observable.fromReadable = function(readable) {     return new Observable(function(observer) {         function nop() {};          var nextFn = observer.next ? observer.next.bind(observer) : nop;         var returnFn = observer.return ? observer.return.bind(observer) : nop;         var throwFn = observer.throw ? observer.throw.bind(observer) : nop;          readable.on('data', nextFn);         readable.on('end', returnFn);         readable.on('error', throwFn);          return new Subscription(function() {             readable.removeListener('data', nextFn);             readable.removeListener('end', returnFn);             readable.removeListener('error', throwFn);         });     }); }  var Observer = function(handlers) {     function nop() {};      this.next = handlers.next || nop;     this.return = handlers.return || nop;     this.throw = handlers.throw || nop; }  Observer.fromWritable = function(writable, shouldEnd, throwFn) {     return new Observer({         next: writable.write.bind(writable),          return: shouldEnd ? writable.end.bind(writable) : function() {},          throw: throwFn     }); } 

You may have noticed that I changed a few names and used the simpler concepts of Observer and Subscription, introduced here, to avoid the overload of reponsibilities done by Observables in Generator. Basically, the Subscription allows you to unsubscribe from the Observable. Anyway, with the above code you can have a pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout)); 

Compared with process.stdin.pipe(process.stdout), what you have is a way to combine, filter, and transform streams that also works for any other sequence of data. You can achieve it with Readable, Transform, and Writable streams but the API favors subclassing instead of chaining Readables and applying functions. On the Observable model, For example, transforming values corresponds to applying a transformer function to the stream. It does not require a new subtype of Transform.

Observable.just = function(/*... arguments*/) {     var values = arguments;     return new Observable(function(observer) {         [].forEach.call(values, function(value) {             observer.next(value);         });         observer.return();         return new Subscription(function() {});     }); };  Observable.prototype.transform = function(transformer) {     var source = this;     return new Observable(function(observer) {         return source.subscribe({             next: function(v) {                 observer.next(transformer(v));             },             return: observer.return.bind(observer),             throw: observer.throw.bind(observer)         });     }); };  Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)   .subscribe(Observer.fromWritable(process.stdout)) 

The conclusion? It's easy to introduce the reactive model and the Observable concept anywhere. It's harder to implement an entire library around that concept. All those little functions need to work together consistently. After all, the ReactiveX project is still going at it. But if you really need to send the file content to the client, deal with encoding, and zip it then the support it's there, in NodeJS, and it works pretty well.

like image 55
m4ktub Avatar answered Sep 26 '22 09:09

m4ktub