Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How I can stop async.queue after the first fail?

I want to stop of executing of my async.queue after first task error was occurred. I need to perform several similar actions in parallel with the concurrency restriction, but stop all the actions after first error. How can I do that or what should I use instead?

like image 826
Tolsi Avatar asked Jan 10 '14 09:01

Tolsi


1 Answers

Assuming you fired 5 parallel functions, each will take 5 seconds. While in 3rd second, function 1 failed. Then how you can stop the execution of the rest?

It depends of what those functions do, you may poll using setInterval. However if your question is how to stop further tasks to be pushed to the queue. You may do this:

 q.push(tasks, function (err) {
    if (err && !called) {
      //Will prevent async to push more tasks to the queue, however please note that
      //whatever pushed to the queue, it will be processed anyway.
      q.kill();

      //This will not allow double calling for the final callback
      called = true; 


      //This the main process callback, the final callback
      main(err, results);
    }
  });

Here a full working example:

var async = require('async');

/* 
   This function is the actual work you are trying to do.
   Please note for example if you are running child processes
   here, by doing q.kill you will not stop the execution 
   of those processes, so you need actually to keep track the 
   spawned processed and then kill them when you call q.kill 
   in 'pushCb' function. In-case of just long running function,
   you may poll using setInterval
*/
function worker(task, wcb) {
  setTimeout(function workerTimeout() {   
    if (task === 11 || task === 12 || task === 3) {
      return wcb('error in processing ' + task);
    }

    wcb(null, task + ' got processed');

  }, Math.floor(Math.random() * 100));
}


/*
  This function that will push the tasks to async.queue,
  and then hand them to your worker function
*/
function process(tasks, concurrency, pcb) {
  var results = [], called = false;

  var q = async.queue(function qWorker(task, qcb) {

    worker(task, function wcb(err, data) {
      if (err) {
        return qcb(err); //Here how we propagate error to qcb
      }

      results.push(data);

      qcb();
    });

  }, concurrency);

 /*
   The trick is in this function, note that checking q.tasks.length
   does not work q.kill introduced in async 0.7.0, it is just setting
   the drain function to null and the tasks length to zero
 */
  q.push(tasks, function qcb(err) {
    if (err && !called) {
      q.kill();
      called = true;
      pcb(err, results);
    }
  });

  q.drain = function drainCb() {
    pcb(null, results);
  }
}

var tasks = [];
var concurrency = 10;

for (var i = 1; i <= 20; i += 1) {
  tasks.push(i);
}

process(tasks, concurrency, function pcb(err, results) {
  console.log(results);

  if (err) {
    return console.log(err);
  }

  console.log('done');
});
like image 189
Laith Shadeed Avatar answered Sep 30 '22 13:09

Laith Shadeed