If I have a Node js stream, say for example from something like process.stdin
or from fs.createReadStream
, how can I convert this to be an RxJs Observable stream using RxJs5?
I see that RxJs-Node has a fromReadableStream
method, but that looks like it hasn't been updated in close to a year.
For anyone looking for this, following Mark's recommendation, I adapted rx-node fromStream
implementation for rxjs5.
import { Observable } from 'rxjs';
// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
stream.pause();
return new Observable((observer) => {
function dataHandler(data) {
observer.next(data);
}
function errorHandler(err) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener('error', errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return () => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).share();
}
Note that it intrinsically breaks all back pressure functionalities of streams. Observables' are a push technology. All input chunks are going to be read and pushed to the observer as quickly as possible. Depending on your case, it might not be the best solution.
The following should work for both v4 and v5 (disclaimer untested):
fromStream: function (stream, finishEventName, dataEventName) {
stream.pause();
finishEventName || (finishEventName = 'end');
dataEventName || (dataEventName = 'data');
return Observable.create(function (observer) {
// This is the "next" event
const data$ = Observable.fromEvent(stream, dataEventName);
// Map this into an error event
const error$ = Observable.fromEvent(stream, 'error')
.flatMap(err => Observable.throw(err));
// Shut down the stream
const complete$ = Observable.fromEvent(stream, finishEventName);
// Put it all together and subscribe
const sub = data$
.merge(error$)
.takeUntil(complete$)
.subscribe(observer);
// Start the underlying node stream
stream.resume();
// Return a handle to destroy the stream
return sub;
})
// Avoid recreating the stream on duplicate subscriptions
.share();
},
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With