Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to calling API inside for loop using Promises

I have 500 millions of object in which each has n number of contacts as like below

var groupsArray = [
                    {'G1': ['C1','C2','C3'....]},
                    {'G2': ['D1','D2','D3'....]}
                     ...
                    {'G2000': ['D2001','D2002','D2003'....]}
                     ...
                ]

I have two way of implementation in nodejs which is based on regular promises and another one using bluebird as shown below

Regular promises

...
var groupsArray = [
                    {'G1': ['C1','C2','C3']},
                    {'G2': ['D1','D2','D3']}
                ]

function ajax(url) {
  return new Promise(function(resolve, reject) {
        request.get(url,{json: true}, function(error, data) {
            if (error) {
                reject(error);
            } else {
                resolve(data);  
            }
        });
    });
}
_.each(groupsArray,function(groupData){
    _.each(groupData,function(contactlists,groupIndex){
        // console.log(groupIndex)
        _.each(contactlists,function(contactData){
            ajax('http://localhost:3001/api/getcontactdata/'+groupIndex+'/'+contactData).then(function(result) {
                console.log(result.body);
              // Code depending on result
            }).catch(function() {
              // An error occurred
            });
        })
    })
})
...

Using bluebird way i have used concurrency to check how to control the queue of promises

...
_.each(groupsArray,function(groupData){
    _.each(groupData,function(contactlists,groupIndex){
        var contacts = [];
        // console.log(groupIndex)
        _.each(contactlists,function(contactData){
            contacts.push({
                contact_name: 'Contact ' + contactData
            });
        })
        groups.push({
            task_name: 'Group ' + groupIndex,
            contacts: contacts
        });
    })
})

Promise.each(groups, group => 
    Promise.map(group.contacts,
         contact => new Promise((resolve, reject) => {
                /*setTimeout(() => 
                    resolve(group.task_name + ' ' + contact.contact_name), 1000);*/
                request.get('http://localhost:3001/api/getcontactdata/'+group.task_name+'/'+contact.contact_name,{json: true}, function(error, data) {
                    if (error) {
                        reject(error);
                    } else {
                        resolve(data);  
                    }
                });
}).then(log => console.log(log.body)), 
{
    concurrency: 50
}).then(() => console.log())).then(() => {
    console.log('All Done!!');
});
...

I want to know when dealing with 100 millions of api call inside loop using promises. please advise the best way to call API asynchronously and deal the response later.

like image 523
mymotherland Avatar asked Mar 17 '17 06:03

mymotherland


2 Answers

My answer using regular Node.js promises (this can probably easily be adapted to Bluebird or another library).

You could fire off all Promises at once using Promise.all:

var groupsArray = [
                    {'G1': ['C1','C2','C3']},
                    {'G2': ['D1','D2','D3']}
                ];


function ajax(url) {
  return new Promise(function(resolve, reject) {
        request.get(url,{json: true}, function(error, data) {
            if (error) {
                reject(error);
            } else {
                resolve(data);  
            }
        });
    });
}

Promise.all(groupsArray.map(group => ajax("your-url-here")))
    .then(results => {
        // Code that depends on all results.
    })
    .catch(err => {
        // Handle the error.
    });

Using Promise.all attempts to run all your requests in parallel. This probably won't work well when you have 500 million requests to make all being attempted at the same time!

A more effective way to do it is to use the JavaScript reduce function to sequence your requests one after the other:

// ... Setup as before ...

const results = [];

groupsArray.reduce((prevPromise, group) => {
            return prevPromise.then(() => {
                return ajax("your-url-here")
                    .then(result => {
                        // Process a single result if necessary.
                        results.push(result); // Collect your results.
                    });
            });
        },
        Promise.resolve() // Seed promise.
    );
    .then(() => {
        // Code that depends on all results.
    })
    .catch(err => {
        // Handle the error.
    });

This example chains together the promises so that the next one only starts once the previous one completes.

Unfortunately the sequencing approach will be very slow because it has to wait until each request has completed before starting a new one. Whilst each request is in progress (it takes time to make an API request) your CPU is sitting idle whereas it could be working on another request!

A more efficient, but complicated approach to this problem is to use a combination of the above approaches. You should batch your requests so that the requests in each batch (of say 10) are executed in parallel and then the batches are sequenced one after the other.

It's tricky to implement this yourself - although it's a great learning exercise - using a combination of Promise.all and the reduce function, but I'd suggest using the library async-await-parallel. There's a bunch of such libraries, but I use this one and it works well and it easily does the job you want.

You can install the library like this:

npm install --save async-await-parallel

Here's how you would use it:

const parallel = require("async-await-parallel");

// ... Setup as before ...

const batchSize = 10;

parallel(groupsArray.map(group => {
        return () => { // We need to return a 'thunk' function, so that the jobs can be started when they are need, rather than all at once.
            return ajax("your-url-here");               
        }
    }, batchSize)
    .then(() => {
        // Code that depends on all results.
    })
    .catch(err => {
        // Handle the error.
    });

This is better, but it's still a clunky way to make such a large amount of requests! Maybe you need to up the ante and consider investing time in proper asynchronous job management.

I've been using Kue lately for managing a cluster of worker processes. Using Kue with the Node.js cluster library allows you to get proper parallelism happening on a multi-core PC and you can then easily extend it to muliple cloud-based VMs if you need even more grunt.

See my answer here for some Kue example code.

like image 134
Ashley Davis Avatar answered Sep 19 '22 18:09

Ashley Davis


In my opinion you have two problems coupled in one questions - I'd decouple them.

#1 Loading of a large dataset

Operation on such a large dataset (500m records) will surely cause some memory limit issues sooner or later - node.js runs in a single thread and that is limited to use approx 1.5GB of memory - after that your process will crash.

In order to avoid that you could be reading your data as a stream from a CSV - I'll use scramjet as it'll help us with the second problem, but JSONStream or papaparse would do pretty well too:

$ npm install --save scramjet

Then let's read the data - I'd assume from a CSV:

const {StringStream} = require("scramjet");

const stream = require("fs")
    .createReadStream(pathToFile)
    .pipe(new StringStream('utf-8'))
    .csvParse()

Now we have a stream of objects that will return the data line by line, but only if we read it. Solved problem #1, now to "augment" the stream:

#2 Stream data asynchronous augmentation

No worries - that's just what you do - for every line of data you want to fetch some additional info (so augment) from some API, which by default is asynchronous.

That's where scramjet kicks in with just couple additional lines:

stream
    .flatMap(groupData => Object.entries(groupData))
    .flatMap(([groupIndex, contactList]) => contactList.map(contactData => ([contactData, groupIndex])
    // now you have a simple stream of entries for your call
    .map(([contactData, groupIndex]) => ajax('http://localhost:3001/api/getcontactdata/'+groupIndex+'/'+contactData))
    // and here you can print or do anything you like with your data stream
    .each(console.log)

After this you'd need to accumulate the data or output it to stream - there are numbers of options - for example: .toJSONArray().pipe(fileStream).

Using scramjet you are able to separate the process to multiple lines without much impact on performance. Using setOptions({maxParallel: 32}) you can control concurrency and best of all, all this will run with a minimal memory footprint - much much faster than if you were to load the whole data into memory.

Let me know how if this is helpful - your question is quite complex so let me know if you run into any problems - I'll be happy to help. :)

like image 32
Michał Karpacki Avatar answered Sep 21 '22 18:09

Michał Karpacki