Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to build a dynamic task list in nodejs Async (waterfall, series, etc...)

I am pulling information from some collections in mongo that contain node and edge data. First i must get the node so i can grab its edges. Once i have a list of edges i then go back out and grab more nodes (etc.. based on a depth value). The following code is an loose example of how i am attempting to use async.waterfall and the task list.

Initially i have only a single task but once i make my first query i add to the task array. Unfortunately this does not seem to register with async and it does not continue to process the tasks i am adding.

Is there a better way to do this?

var async = require('async')
var mongoose = require('mongoose')
var _ = requrie('underscore')

var Client = this.Mongo.connect(/*path to mongo*/)
var Node = mongoose.Schema({
    id : String,
    graph_id : String
})
var Edge = mongoose.Schema({
    id : String,
    source_id : String,
    destination_id : String
})
var Nodes = Client.model('myNode', Node)
var Edges = Client.model('myEdge', Edge)
var funcs = []
var round = 1
var depth = 2

var query = {
    node : {
        id : '12345'
    },
    edge : {
        id : '12345'   
    }
}

var addTask = function(Nodes, Edges, query, round, depth) {
    return function(callback) {
    queryData(Nodes, Edges, query, function(err, node_list) {
        if(depth > round) {
             round++
             function_array.push(addTask(Nodes, Edges, query, round, depth))
        }
    })
}
}

var queryData = function(Nodes, Edges, query, cb) {
async.waterfall([
    function(callback) {
        Nodes.find(query.node, function(err, nodes) {
            var node_keys = _.map(nodes, function(node) {
                 return node.id  
            })
            callback(null, nodes, node_keys)  
        })
    },
    function(nodes, node_keys, callback) {
        query.edge.$or = [ {'source_id' : {$in:node_keys}}, {'destination_id' : {$in:node_keys}} ]
        Edges.find(query.edge, function(err, edges) {
            var edge_keys = _.map(edges, function(edge) {
                if(edge['_doc']['source_id'] != query.node.id) {
                     return edge['_doc']['source_id']
                } else {
                     return edge['_doc']['destination_id']
                }
                callback(null, nodes, edges, node_keys, edge_keys)
            })
        })
    }
], function(err, nodes, edges, node_keys, edge_keys) {
    // update the results object then...
    cb(null, _.uniq(edge_keys)
 })
}

var function_array = []
function_array.push(addTask(Nodes, Edges, query, round, depth))

async.waterfall(function_array, function(err) {
    Client.disconnect()
    //this should have run more than just the initial task but does not
})    

--------------------- UPDATE ---------------------------

So after playing around with trying to get Async waterfall or series to do this by adding trailing functions I decided to switch to using async.whilst and am now happy with the solution.

function GraphObject() {
  this.function_array = []
}

GraphObject.prototype.doStuff = function() {
  this.function_array.push(this.buildFunction(100))
  this.runTasks(function(err) {
     console.log('done with all tasks')
  }
}

GraphObject.prototype.buildFunction = function(times) {
  return function(cb) {
    if(times != 0) {
      this.function_array.push(this.buildFunction(times - 1))
    }
    cb(null)
  }
}

GraphObject.prototype.runTasks = function(cb) {
  var tasks_run = 0
  async.whilst(
    function(){
      return this.function_array.length > 0
    }.bind(this),
    function(callback) {
      var func = this.function_array.shift()
      func.call(this, function(err) { 
        tasks_run++
        callback(err) 
      })
    }.bind(this),
    function(err) {
      console.log('runTasks ran '+tasks_run+' tasks')
      if(err) {
        cb(500)
      }
      cb(null)
    }.bind(this)
  )
}
like image 834
Jack.Crash Avatar asked Oct 01 '22 16:10

Jack.Crash


1 Answers

A task in your function_array can only add a new task to the array provided it is NOT the last task in the array.

In your case, your function_array contained only 1 task. That task itself cannot add additional tasks since it's the last task.

The solution is to have 2 tasks in the array. A startTask to bootstrap the process, and a finalTask that is more of a dummy task. In that case,

function_array = [startTask, finalTask];

Then startTask would add taskA, taskB will add task C and eventually

function_array = [startTask, taskA, taskB, taskC, finalTask];

The sample code below that illustrates the concepts.

    var async = require('async');
    var max = 6;

    var nodeTask = function(taskId, value, callback){
        var r = Math.floor(Math.random() * 20) + 1;
        console.log("From Node Task %d: %d", taskId, r);

        // add an edge task
        if (taskId < max) {
            function_array.splice(function_array.length-1, 0, edgeTask);
        }

        callback(null, taskId + 1, value + r);
    };

    var edgeTask = function(taskId, value, callback){
        var r = Math.floor(Math.random() * 20) + 1;
        console.log("From Edge Task %d: %d", taskId, r);

        // add a node task
        if (taskId < max) {
            function_array.splice(function_array.length-1, 0, nodeTask);
        }

        callback(null, taskId + 1, value + r);
    };

    var startTask = function(callback) {
        function_array.splice(function_array.length-1, 0, nodeTask);
        callback(null, 1, 0);
    };

    var finalTask = function(taskId, value, callback) {
        callback(null, value);
    };

    var function_array = [startTask, finalTask];

    async.waterfall(function_array, function (err, result) {
        console.log("Sum is ", result);
    });
like image 68
Kelvin Yong Avatar answered Oct 05 '22 12:10

Kelvin Yong