Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Control the rate of a javascript asynchronous flow (in a loop)

Suppose you want to launch a (random) process for every folder in a list in a short code:

var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

_.each(folders, function(folder) {
    exec("tar cvf " + folder + ".tgz " + folder);
});

If the list is long, i might end up running a large amount of processes concurrently, which is to be avoid. What's a fairly simple way to run the executions on a controlled rate (maximum 5 concurrent process here)?

edit: the question is meant for every kind of async flow (in which you want to control the rate), not just for the exec-over-folders problem.

like image 675
Sebastien Avatar asked Nov 04 '16 17:11

Sebastien


3 Answers

Use async package and it's function: eachLimit

it does the same as lodash but with async flow handling and making iterations to not run out of limit at a time:

var async = require('async');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

var maxProcesses = 5; // 5 items at a time
async.eachLimit(
  folders, // collection
  maxProcesses, // limit
  function(folder, next) { // iterator function. args: item, callback
    var cmd = "tar -cf " + folder + ".tgz " + folder;
    console.log('calling:', cmd);
    exec(cmd, function(err, stdOut, stdErr) { // executing cmd
      if(err) console.error(err); // if error putting to console
      next(); // passing the async flow to handle the next iteration
    });
  },
  function() { // after all iterations finished
    console.log('finished processing commands');
  });

or with parallelLimit :

var async = require('async');
var _ = require('lodash');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

var callStack = [];
_.each(folders, function(folder) { // generating our stack of commands
  callStack.push(function(done) {
    var cmd = "tar -cf " + folder + ".tgz " + folder;
    exec(cmd, function(err, stdOut, stdErr) {
      if(err) console.error(err);
      done(null, folder);
    });
  });
});

var maxProcesses = 5; // 5 items at a time
async.parallelLimit(callStack, maxProcesses, function() {console.log('finished');});

"making it look shorter" :)

const
  async = require('async'),
  exec = require('child_process').exec;

let folders = [...]; 
async.eachLimit(folders, 5, 
  (folder, next) => 
    exec("tar -cf " + folder + ".tgz " + folder, () => next()),
    () => console.log('finished'));

and

const
  async = require('async'),
  exec = require('child_process').exec;

let folders = [...]; 
let commands = folders.map(folder => done => exec("tar -cf " + folder + ".tgz " + folder, () => done());
async.parallelLimit(commands, 5, () => console.log('finished'));



if any of this examples not ok for You, or Your system is very big so let's try to use message queue system like rsmq

like image 170
num8er Avatar answered Nov 12 '22 16:11

num8er


Promises

I just love promises and love to stick them wherever possible.

Here is a solution I believe would work for your case.

var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;
var promiseArr = [];

folders.forEach(function (folder) {
    var pr = {
        start: function () {
            if (pr.promise) return pr.promise;
            return pr.promise = new Promise(function (resolve) {
                exec("tar cvf " + folder + ".tgz " + folder,
                  undefined, (err, stdout, stderr) => {
                      // This is your logic, you can reject depending on err
                      var ind = promiseArr.indexOf(pr);
                      if (ind >= 0) promiseArr.splice(ind, 1);
                      resolve(stdout);
                  });
            });
        }
    };
    promiseArr.push(pr);
});

var racePromises = function () {
    if (!promiseArr.length) return;
    Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
    console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
}
racePromises();

Short explanation

Create an array where each element represents a task. Firstly select 5 of them and start them. Whenever one of them completes, remove it from the array and start 5 tasks from the array again.

Example Run

Test Example

Recreating eachLimit with promises just for fun

var myEachLimit = function (collection, maxConcurrentCalls, callback) {
    return new Promise(function (resolve, reject) {
        var promiseArr = [];

        collection.forEach(function (item) {
            var pr = {
                start: function () {
                    if (pr.promise) return pr.promise;
                    return pr.promise = new Promise(function (resolve) {
                        callback.call(item, item, function () {
                            var ind = promiseArr.indexOf(pr);
                            if (ind >= 0) promiseArr.splice(ind, 1);
                            resolve();
                        });

                    });
                }
            };
            promiseArr.push(pr);
        });

        var racePromises = function () {
            if (!promiseArr.length) {
                resolve();
                return;
            }
            Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
            console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
        }
        racePromises();
    });
}


// Demo

var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;

myEachLimit(folders, maxConcurrentProcessCount, function (folder, next) {
    exec("tar cvf " + folder + ".tgz " + folder, (err, stdout, stderr) => {
        next();
    });
}).then(function () {
    console.log("Finished all processes");
});
like image 26
Gokhan Kurt Avatar answered Nov 12 '22 17:11

Gokhan Kurt


Native Javascript

All you need is some kind of a loadbalancer. Put your loop into a seperate function like so:

  /**
  * Loops through your Folderarray and begins at the given index.
  * @param  {[type]} lastIndex       [last index which your loop stopped.]
  * @param  {[type]} maxProcesses    [maximum of processes you want to have.]
  * @param  {[type]} folderArray     [folder array.]
  */
  function loopFolders(maxProcesses, lastIndex, folderArray){

    // counter for our actual processes.
    let processes = 0;
    // This is to stop the loop, since JavaScript has no built-in break for loops.
    let maximumReached = false;

    // loop through array.
    folderArray.forEach(function(element, index, array){

      // Do stuff once you hit the last point.
      if(index > lastIndex && !maximumReached){

        // Check how many processes are running.
        if(processes <= maxProcesses){

          // create your child process.
          let exec = require('child_process').exec;
          // Do stuff with Folderelement from Folderarray.
          exec("tar cvf " + element + ".tgz " + element);

          // Increment processes.
          processes++;

        }else{
          /**
           * Magic begins here:
           * If max processes was reached, retry after a while.
           */

          // We are done for this loop.
           maximumReached = true;

           // Set timeout for 10 seconds and then retry.
          setTimeout(function(){
            // Calll function again.
            loopFolders(maxProcesses, index, array);
          }, 10000);
        }

      }

    });

  }

To call this loop from the beginning you just go like this:

// your Array of folders from somewhere.    
let folders = [...];
// Call loopFolders with maximum of 5 and the beginning index of 0.
loopFolders(5, 0, folders);

This code is a very basic example of a loadbalancer. Keep in mind that my example will never know if any of the other processes are done. You could use some kind of a callback to be sure. But this should do the trick for you at least to begin with something.

To make use of the NodeJS Childprocess events please take a look at https://nodejs.org/api/child_process.html

You could do a callback for the loop in the 'exit' event to make sure that your child processes won't go out of control.

Hope this helps.

Regards, Megajin

like image 1
Megajin Avatar answered Nov 12 '22 15:11

Megajin