Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Save a very big CSV to mongoDB using mongoose

I have a CSV file containing more than 200'000 rows. I need to save it to MongoDB.

If I try a for loop, Node will run out of memory.

fs.readFile('data.txt', function(err, data) {
  if (err) throw err;

  data.split('\n');

  for (var i = 0; i < data.length, i += 1) {
    var row = data[i].split(',');

    var obj = { /* The object to save */ }

    var entry = new Entry(obj);
    entry.save(function(err) {
      if (err) throw err;
    }
  } 
}

How can I avoid running out of memony?

like image 736
Alberto Avatar asked Jul 31 '14 09:07

Alberto


1 Answers

Welcome to streaming. What you really want is an "evented stream" that processes your input "one chunk at a time", and of course ideally by a common delimiter such as the "newline" character you are currently using.

For really efficient stuff, you can add usage of MongoDB "Bulk API" inserts to make your loading as fast as possible without eating up all of the machine memory or CPU cycles.

Not advocating as there are various solutions available, but here is a listing that utilizes the line-input-stream package to make the "line terminator" part simple.

Schema definitions by "example" only:

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("\n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

So generally the "stream" interface there "breaks the input down" in order to process "one line at a time". That stops you from loading everything at once.

The main parts are the "Bulk Operations API" from MongoDB. This allows you to "queue up" many operations at a time before actually sending to the server. So in this case with the use of a "modulo", writes are only sent per 1000 entries processed. You can really do anything up to the 16MB BSON limit, but keep it manageable.

In addition to the operations being processed in bulk, there is an additional "limiter" in place from the async library. It's not really required, but this ensures that essentially no more than the "modulo limit" of documents are in process at any time. The general batch "inserts" come at no IO cost other than memory, but the "execute" calls mean IO is processing. So we wait rather than queuing up more things.

There are surely better solutions you can find for "stream processing" CSV type data which this appears to be. But in general this gives you the concepts to how to do this in a memory efficient manner without eating CPU cycles as well.

like image 141
Neil Lunn Avatar answered Oct 02 '22 23:10

Neil Lunn