Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pausing a readable stream in Node.js

Tags:

stream

node.js

I'm using csv-to-json, a neat library to process CSV files.

I've got a use case where I need to process a large (>2 million rows) CSV and insert it into a DB.

To do this without running into memory issues, I intend to process the CSV as a stream, pausing the stream every 10000 rows, inserting the rows in my DB and then resuming the stream.

For some reason I can't seem to pause the stream.

Take for example the following code:

const rs = fs.createReadStream("./foo.csv");
rs.pause();

let count = 0;

csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})

count is logged 200 times (that's how many rows I have in my CSV) - I was expecting it not to log anything since the stream is paused before passing it over to fromStream()

like image 418
nicholaswmin Avatar asked Jan 21 '17 18:01

nicholaswmin


2 Answers

Here's a solution suggested by the creator of the library, tracked in this Issue:

var tmpArr=[];
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({
  write: function(json, encoding,callback){
    tmpArr.push(json);
    if (tmpArr.length===10000){
      myDb.save(tmpArr,function(){
        tmpArr=[];
        callback();
      })
    }else{
      callback();
    }
  } ,
  objectMode:true
}))
.on('finish',function(){
  if (tmpArr.length>0){
    myDb.save(tmpArr,function(){
      tmpArr=[];
    })
  }
})

I've actually managed to emulate pausing by unpiping like so, but it's not ideal:

let count = 0;
var csvParser=csv()
.fromStream(rs)
.on("json", (json) => {
  rows.push(json);
  if (rows.length % 1000 === 0) {
    rs.unpipe();
    // clear `rows` right after `unpipe`
    const entries = rows;
    rows = [];
    this._insertEntries(db, entries, ()=> {
      rs.pipe(csvParser);
    });
  }
})
like image 113
nicholaswmin Avatar answered Oct 15 '22 07:10

nicholaswmin


I have leveraged the fact that csvtojson also has fromString(...) method, and used it as below.

  1. Use line-by-line package to read fixed number of lines i.e. 10000 and store them in an array.
  2. pause line-by-line reader using lr.pause().
  3. insert headers line (if your csv file has header line then use a simple conditional statement to ignore first line returned by line-by-line reader) at index 0.
  4. join all lines with EOL character which will give you string representation of 10000 lines of that CSV file.
  5. use csvtojson's .fromString(...) to convert the string representation of block into json objects and insert them into db.
  6. resume the stream via lr.resume() and repeat until line-by-line reader emits 'end' event.

Here's complete code

const CSVToJSON = require("csvtojson");
const LineByLineReader = require("line-by-line");
const { EOL } = require("os");

const BLOCK_LIMIT = 10000;

let lines = [];
let isFirstLineProcessed = false;

const lr = new LineByLineReader("./foo.csv");

lr
.on("line", (line) => {

    // remove this if statement if your CSV does not contain headers line
    if (!isFirstLineProcessed) {
        isFirstLineProcessed = true;
        return;
    }

    lines.push(line);

    if (lines.length === BLOCK_LIMIT) {
        lr.pause();

        // insert headers string ("field1, field2, ...") at index 0;
        lines.splice(0, 0, headers);

        // join all lines using newline operator ("\n") to form a valid csv string
        const csvBlockString = lines.join(EOL);
        const entries = [];

        lines = [];      

        csv()
            .fromString(csvBlockString)
            .on("json", (json) => {
                entries.push(json);
            })
            .on("done", () => {
                this._insertEntries(db, entries, ()=> {
                    lr.resume();
               });
            });
    }
})
.on("end", () => {
    console.log("done");
});
like image 38
Nishant Desai Avatar answered Oct 15 '22 09:10

Nishant Desai