Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Async queue, filestream end how to know when both finished

I am having a slight issue when using async.queue with a filestream

  1. I have a scenario where my filestream will finish
  2. I set fileRead to true
  3. however the queue will be empty and already have called drain
  4. this then leads my "done" to never be called

what is the proper way to say "end the queue" after my filestream is "end" and the queue is empty?

var fs = require('fs')
    , util = require('util')
    , stream = require('stream')
    , es = require('event-stream');

var async = require('async');

var fileRead = false;
var lineNr = 0;

var q = async.queue(function(task, callback) {
    task(function(err, lineData){
        responseLines.push(lineData);
        callback();
      });
  }, 5);

var q.drain = function() {
  if(fileRead){
    done(null, responseLines);
  }
}

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){
        s.pause();
        q.push(async.apply(insertIntoDb, line))
        s.resume();
    })
    .on('error', function(err){
       done(err);
    })
    .on('end', function(){
      fileRead = true;
    })
);

or is there a better use of async which would allow me to do this? async process line by line with the ability to exit early if one of the lines has errors

like image 349
user2950720 Avatar asked Feb 09 '17 19:02

user2950720


1 Answers

Firstly, I'm not sure how much of your snippet is pseudo code but var q.drain = ... is not valid javascript and should error. It should just be q.drain = as you're defining a property on an existing object not declaring a new variable. This could be why your drain function isn't firing if it isn't pseudo code.

There are a few ways you could achieve what I think you're trying to do. One would be to check the length of the queue in your end handler and set the drain function if there are still items to process.

.on('end', function(){
  if(!q.length){
    callDone();
  }
  else {
    q.drain = callDone;
  }
});

function callDone(){
  done(null, responseLines);
}

This is effectively saying "if the queue's been processed call done, if not, call done when it has!" I'm certain there are lots of ways to tidy up your code but hopefully this provides a solution to your specific problem.

like image 97
leesio Avatar answered Oct 12 '22 23:10

leesio