I've been playing with rxjs
for some time now and I like how I can use it's operators for logic rather than imperative programming.
However, I also like node's stream which are also highly composable so my obvious reaction was to use them both but I haven't seen it being mentioned a lot (actually, I haven't at all) besides the binding for it in rxjs's book.
So, my question really is, how do I make use of all the transform streams that are in npm on RxJS? Or, is it even possible?
Example:-
var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var src = fs.createReadStream('./myFile.csv');
src.pipe(csv).pipe(process.stdout);
Essentially, I would want to do this:-
var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var rx= require('rx-node');
var src = fs.createReadStream('./myFile.csv');
var obj = rx.fromReadableStream(src);
obj.pipe(csb).map(x=>console.log(x));
I've been told to use highland
in the past but I'm strictly looking for rxjs
solution here.
You don't have to use rx-node
but you can! Remember: All streams are event emitters!
.
Prepare:
input.txt
Hello World!
Hello World!
Hello World!
Hello World!
Hello World!
Run:
npm install through2 split2 rx rx-node
And in the index.js
:
var Rx = require('rx');
Rx.Node = require('rx-node');
var fs = require('fs');
var th2 = require('through2');
var split2 = require('split2');
var file = fs.createReadStream('./input.txt').on('error', console.log.bind(console, 'fs err'));
var transform = th2(function(ch, en, cb) {
cb(null, ch.toString());
}).on('error', function(err) {
console.log(err, err.toString());
});
// All streams are event emitters ! (one way without using rx-node)
// var subs = Rx.Observable.fromEvent(transform, 'data').share();
// subs
// .map(value => 'Begin line: ' + value)
// .subscribe(value => console.log(value));
// rx-node has convenience functions (another way)
var subs = Rx.Node.fromTransformStream(transform).share()
.map(value => 'Begin line: ' + value)
.subscribe(value => console.log(value));
file.pipe(split2()).pipe(transform);
Output:
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With