Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use node's transform stream in rxjs?

I've been playing with rxjs for some time now and I like how I can use it's operators for logic rather than imperative programming. However, I also like node's stream which are also highly composable so my obvious reaction was to use them both but I haven't seen it being mentioned a lot (actually, I haven't at all) besides the binding for it in rxjs's book.

So, my question really is, how do I make use of all the transform streams that are in npm on RxJS? Or, is it even possible?
Example:-

var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var src = fs.createReadStream('./myFile.csv');
src.pipe(csv).pipe(process.stdout);

Essentially, I would want to do this:-

var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var rx= require('rx-node');
var src = fs.createReadStream('./myFile.csv');

var obj = rx.fromReadableStream(src);
obj.pipe(csb).map(x=>console.log(x));

I've been told to use highland in the past but I'm strictly looking for rxjs solution here.

like image 407
shriek Avatar asked Dec 11 '15 19:12

shriek


1 Answers

You don't have to use rx-node but you can! Remember: All streams are event emitters!.

Prepare: input.txt

Hello World!
Hello World!
Hello World!
Hello World!
Hello World!

Run:

npm install through2 split2 rx rx-node

And in the index.js:

var Rx = require('rx');
Rx.Node = require('rx-node');
var fs = require('fs');
var th2 = require('through2');
var split2 = require('split2');

var file = fs.createReadStream('./input.txt').on('error', console.log.bind(console, 'fs err'));

var transform = th2(function(ch, en, cb) {
  cb(null, ch.toString());
}).on('error', function(err) {
  console.log(err, err.toString());
});

// All streams are event emitters ! (one way without using rx-node)
// var subs = Rx.Observable.fromEvent(transform, 'data').share();
// subs
//   .map(value => 'Begin line: ' + value)
//   .subscribe(value => console.log(value));

// rx-node has convenience functions (another way)
var subs = Rx.Node.fromTransformStream(transform).share()
  .map(value => 'Begin line: ' + value)
  .subscribe(value => console.log(value));

file.pipe(split2()).pipe(transform);

Output:

Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
like image 149
edin-m Avatar answered Sep 19 '22 10:09

edin-m