Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NodeJS, promises, streams - processing large CSV files

I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.

This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).

So, I start with:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Now, I have two inter-related issues:

  1. I need to throttle the actual amount of data being processed, so as to not create memory pressures.
  2. The function passed as the processor param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now: pg-promise). As such, it will create a promise in memory and move on, repeatedly.

The pg-promise library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable section after each read(), which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.

Does anyone have a working example of this that I can use as a jumping point?

UPDATE: Probably more than one way to skin the cat, but this works:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

Anyone sees a potential problem with this approach?

like image 341
alphadogg Avatar asked Oct 14 '15 15:10

alphadogg


4 Answers

Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

Note that the only thing I changed: using library csv-parse instead of csv, as a better alternative.

Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.

like image 34
vitaly-t Avatar answered Sep 20 '22 06:09

vitaly-t


You might want to look at promise-streams

var ps = require('promise-streams');
passedStream
  .pipe(csv.parse({trim: true}))
  .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
  .wait().then(_ => {
    console.log("All done!");
  });

Works with backpressure and everything.

like image 39
Gjorgi Kjosev Avatar answered Sep 21 '22 06:09

Gjorgi Kjosev


So to say you don't want streaming but some kind of data chunks? ;-)

Do you know https://github.com/substack/stream-handbook?

I think the simplest approach without changing your architecture would be some kind of promise pool. e.g. https://github.com/timdp/es6-promise-pool

like image 32
Markus Avatar answered Sep 21 '22 06:09

Markus


I found a slightly better way of doing the same thing; with more control. This is a minimal skeleton with precise parallelism control. With parallel value as one all records are processed in sequence without having the entire file in memory, we can increase parallel value for faster processing.

      const csv = require('csv');
      const csvParser = require('csv-parser')
      const fs = require('fs');

      const readStream = fs.createReadStream('IN');
      const writeStream = fs.createWriteStream('OUT');

      const transform = csv.transform({ parallel: 1 }, (record, done) => {
                                           asyncTask(...) // return Promise
                                           .then(result => {
                                             // ... do something when success
                                             return done(null, record);
                                           }, (err) => {
                                             // ... do something when error
                                             return done(null, record);
                                           })
                                       }
                                     );

      readStream
      .pipe(csvParser())
      .pipe(transform)
      .pipe(csv.stringify())
      .pipe(writeStream);

This allows doing an async task for each record.

To return a promise instead we can return with an empty promise, and complete it when stream finishes.

    .on('end',function() {
      //do something wiht csvData
      console.log(csvData);
    });
like image 188
Gagandeep Kalra Avatar answered Sep 23 '22 06:09

Gagandeep Kalra