I want to stop of executing of my async.queue after first task error was occurred. I need to perform several similar actions in parallel with the concurrency restriction, but stop all the actions after first error. How can I do that or what should I use instead?
Assuming you fired 5 parallel functions, each will take 5 seconds. While in 3rd second, function 1 failed. Then how you can stop the execution of the rest?
It depends of what those functions do, you may poll using setInterval. However if your question is how to stop further tasks to be pushed to the queue. You may do this:
q.push(tasks, function (err) {
if (err && !called) {
//Will prevent async to push more tasks to the queue, however please note that
//whatever pushed to the queue, it will be processed anyway.
q.kill();
//This will not allow double calling for the final callback
called = true;
//This the main process callback, the final callback
main(err, results);
}
});
Here a full working example:
var async = require('async');
/*
This function is the actual work you are trying to do.
Please note for example if you are running child processes
here, by doing q.kill you will not stop the execution
of those processes, so you need actually to keep track the
spawned processed and then kill them when you call q.kill
in 'pushCb' function. In-case of just long running function,
you may poll using setInterval
*/
function worker(task, wcb) {
setTimeout(function workerTimeout() {
if (task === 11 || task === 12 || task === 3) {
return wcb('error in processing ' + task);
}
wcb(null, task + ' got processed');
}, Math.floor(Math.random() * 100));
}
/*
This function that will push the tasks to async.queue,
and then hand them to your worker function
*/
function process(tasks, concurrency, pcb) {
var results = [], called = false;
var q = async.queue(function qWorker(task, qcb) {
worker(task, function wcb(err, data) {
if (err) {
return qcb(err); //Here how we propagate error to qcb
}
results.push(data);
qcb();
});
}, concurrency);
/*
The trick is in this function, note that checking q.tasks.length
does not work q.kill introduced in async 0.7.0, it is just setting
the drain function to null and the tasks length to zero
*/
q.push(tasks, function qcb(err) {
if (err && !called) {
q.kill();
called = true;
pcb(err, results);
}
});
q.drain = function drainCb() {
pcb(null, results);
}
}
var tasks = [];
var concurrency = 10;
for (var i = 1; i <= 20; i += 1) {
tasks.push(i);
}
process(tasks, concurrency, function pcb(err, results) {
console.log(results);
if (err) {
return console.log(err);
}
console.log('done');
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With