I am having a slight issue when using async.queue with a filestream
what is the proper way to say "end the queue" after my filestream is "end" and the queue is empty?
var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');
var async = require('async');
var fileRead = false;
var lineNr = 0;
var q = async.queue(function(task, callback) {
task(function(err, lineData){
responseLines.push(lineData);
callback();
});
}, 5);
var q.drain = function() {
if(fileRead){
done(null, responseLines);
}
}
var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){
s.pause();
q.push(async.apply(insertIntoDb, line))
s.resume();
})
.on('error', function(err){
done(err);
})
.on('end', function(){
fileRead = true;
})
);
or is there a better use of async which would allow me to do this? async process line by line with the ability to exit early if one of the lines has errors
Firstly, I'm not sure how much of your snippet is pseudo code but var q.drain = ...
is not valid javascript and should error. It should just be q.drain =
as you're defining a property on an existing object not declaring a new variable. This could be why your drain function isn't firing if it isn't pseudo code.
There are a few ways you could achieve what I think you're trying to do. One would be to check the length of the queue in your end handler and set the drain function if there are still items to process.
.on('end', function(){
if(!q.length){
callDone();
}
else {
q.drain = callDone;
}
});
function callDone(){
done(null, responseLines);
}
This is effectively saying "if the queue's been processed call done, if not, call done when it has!" I'm certain there are lots of ways to tidy up your code but hopefully this provides a solution to your specific problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With