Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

nodejs wait until all MongoDB calls in loop finish

I'm reading a data stream from a CSV file row by row, and calling a findOne MongoDB call on each row, how can I wait till all the mongo calls from each row are complete before I run the next function?

I've seen Promises can do it? But I find Promises extremely difficult to understand. And none of the examples I've found seem to cover what I'm trying to it. :/

var validProducts = [];

fs.createReadStream(req.file.path)
  .pipe(csvStream)
  .on('error', function (err) {
    console.error(err);
  })
  // loop through all rows
  .on('data', function (data) {
    if (data.size === 'a3') {
      ProductModel.findOne({ sku: data.sku }, function (err, product) {
        if (product !== null) {
          product.size = data.size;
          product.save();
          validProducts.push(product);
        }
      });
    }
  });

// on finish make call to other function
socket.emit({ 'status': 'complete' });
otherFunction(validProducts);

on('finish') or on('end') will only call at the end of the data stream, not after the Monogo calls.

If I can use promises, can someone please explain how?

like image 260
Nicekiwi Avatar asked Nov 12 '15 04:11

Nicekiwi


2 Answers

You could use the Q API witch allows you to make promises. There is an interesting function that allows you to wait for an array of promises to be resolved. Here is an example of how you could solve your problem with Q.all:

var validProducts = [];
var promises = [];

function handleData(data) {
    if (data.size === 'a3') {

        var deferred = Q.defer();

        ProductModel.findOne({ sku: data.sku }, function (err, product) {
            if (err) {
                deferred.reject(new Error(err));
            }

            if (product) {
                product.size = data.size;
                product.save();
                deferred.resolve(product);
                validProducts.push(product);
            }

        });

        promises.push(deferred.promise);

    }
}

function handleEnd() {
    Q.all(promises).done(function (values) {
        socket.emit({ 'status': 'complete' });
        otherFunction(validProducts);
    });
}

fs.createReadStream(req.file.path)
  .on('data', handleData)
  .on('end', handleEnd);
like image 77
prichrd Avatar answered Nov 10 '22 11:11

prichrd


Use pause/resume

  .on('data', function (data) {
    if (data.size === 'a3') {
      this.pause(); // pause it
      var stream = this; // save 'this'
      ProductModel.findOne({ sku: data.sku }, function (err, product) {
        if (product !== null) {
          product.size = data.size;
          product.save();
          validProducts.push(product);
        }
        stream.resume(); //resume it
      });
    }
  });
like image 3
Jaromanda X Avatar answered Nov 10 '22 12:11

Jaromanda X