I'm using csv-to-json, a neat library to process CSV files.
I've got a use case where I need to process a large (>2 million rows) CSV and insert it into a DB.
To do this without running into memory issues, I intend to process the CSV as a stream, pausing the stream every 10000 rows, inserting the rows in my DB and then resuming the stream.
For some reason I can't seem to pause
the stream.
Take for example the following code:
const rs = fs.createReadStream("./foo.csv");
rs.pause();
let count = 0;
csv()
.fromStream(rs)
.on("json", (json) => {
count++;
console.log(count);
})
.on("done", () => {
cb(null, count);
})
.on("error", (err) => {
cb(err);
})
count
is logged 200 times (that's how many rows I have in my CSV) - I was expecting it not to log anything since the stream is paused before passing it over to fromStream()
Here's a solution suggested by the creator of the library, tracked in this Issue:
var tmpArr=[];
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({
write: function(json, encoding,callback){
tmpArr.push(json);
if (tmpArr.length===10000){
myDb.save(tmpArr,function(){
tmpArr=[];
callback();
})
}else{
callback();
}
} ,
objectMode:true
}))
.on('finish',function(){
if (tmpArr.length>0){
myDb.save(tmpArr,function(){
tmpArr=[];
})
}
})
I've actually managed to emulate pausing by unpiping like so, but it's not ideal:
let count = 0;
var csvParser=csv()
.fromStream(rs)
.on("json", (json) => {
rows.push(json);
if (rows.length % 1000 === 0) {
rs.unpipe();
// clear `rows` right after `unpipe`
const entries = rows;
rows = [];
this._insertEntries(db, entries, ()=> {
rs.pipe(csvParser);
});
}
})
I have leveraged the fact that csvtojson also has fromString(...)
method, and used it as below.
lr.pause()
.EOL
character which will give you string representation of 10000 lines of that CSV file..fromString(...)
to convert the string representation of block into json objects and insert them into db.lr.resume()
and repeat until line-by-line reader emits 'end'
event.Here's complete code
const CSVToJSON = require("csvtojson");
const LineByLineReader = require("line-by-line");
const { EOL } = require("os");
const BLOCK_LIMIT = 10000;
let lines = [];
let isFirstLineProcessed = false;
const lr = new LineByLineReader("./foo.csv");
lr
.on("line", (line) => {
// remove this if statement if your CSV does not contain headers line
if (!isFirstLineProcessed) {
isFirstLineProcessed = true;
return;
}
lines.push(line);
if (lines.length === BLOCK_LIMIT) {
lr.pause();
// insert headers string ("field1, field2, ...") at index 0;
lines.splice(0, 0, headers);
// join all lines using newline operator ("\n") to form a valid csv string
const csvBlockString = lines.join(EOL);
const entries = [];
lines = [];
csv()
.fromString(csvBlockString)
.on("json", (json) => {
entries.push(json);
})
.on("done", () => {
this._insertEntries(db, entries, ()=> {
lr.resume();
});
});
}
})
.on("end", () => {
console.log("done");
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With