Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Async Cursor Iteration with Asynchronous Sub-task

I want to perform an iteration over a mongoDB collection w/o numeric key(_id). The collection only has random strings as an _id, and the size of the collection is massive, thus loading up the whole documents on RAM using .toArray() is not a viable option. plus I want to perform asynchronous task on each element. the usage of .map() or .each(), .forEach() is limited because of the asynchronous nature of the task. I tried to run the task with those mentioned methods but it did of course conflicted with asynchronous task, returned pending promises instead of proper results.

example

async function dbanalyze(){

  let cursor = db.collection('randomcollection').find()
  for(;;){
    const el = cursor.hasNext() ? loaded.next() : null;
    if(!cursor) break
    await performAnalyze(cursor) // <---- this doesn't return a document but just a cursor object
  }

}

how can I iterate over mongoDB collection using only for()?

like image 819
Sungryeol Park Avatar asked Apr 16 '18 02:04

Sungryeol Park


People also ask

Can one async function have multiple awaits?

Using one try/catch block containing multiple await operations is fine when waiting for promises created on the right hand side of the await unary operator: The await operator stores its parent async functions' execution context and returns to the event loop.

Can you use async await with arrow function?

async is used to make a function asynchronous. It unlocks the use of await inside these functions. Using await in any other case is a syntax error. Notice the use of async keyword at the beginning of the function declaration. In the case of arrow function, async is put after the = sign and before the parentheses.

Does async need Athen or await?

We recommend using async/await where possible, and minimize promise chaining. Async/await makes JavaScript code more accessible to developers that aren't as familiar with JavaScript, and much easier to read.

Is async await better than then catch?

In my view, unless a library or legacy codebase forces you to use then/catch , the better choice for readability and maintainability is async/await .


1 Answers

The Cursor.hasNext() method is also "asynchronous", so you need to await that as well. Same goes for Cursor.next(). Therefore the actual "loop" usage really should be a while:

async function dbanalyze(){

  let cursor = db.collection('randomcollection').find()
  while ( await cursor.hasNext() ) {  // will return false when there are no more results
    let doc = await cursor.next();    // actually gets the document
    // do something, possibly async with the current document
  }

}

As noted in the comments, eventually Cursor.hasNext() will return false when the cursor is actually depleted, and the Cursor.next() is the thing that is actually retrieving each value from the cursor. You could do other structures and break the loop when hasNext() is false, but it more naturally lends itself to a while.

These are still "async", so you need to await the promise resolution on each, and that was the main fact you were missing.

As for Cursor.map(), then you are probably missing the point that it can be marked with an async flag on the provided function as well:

 cursor.map( async doc => {                   // We can mark as async
    let newDoc = await someAsyncMethod(doc);  // so you can then await inside
    return newDoc;
 })

But you still actually want to "iterate" that somewhere, unless you can get away with using .pipe() to some other output destination.

Also the async/await flags also make Cursor.forEach() "more practical again", as it's one common flaw was not being able to simply handle an "inner" asynchronous call, but with these flags you can now do so with ease, though admittedly since you must use a callback, you probably want to wrap this in a Promise :

await new Promise((resolve, reject) => 
  cursor.forEach(
    async doc => {                              // marked as async
      let newDoc = await someAsyncMethod(doc);  // so you can then await inside
      // do other things
    },
    err => {
      // await was respected, so we get here when done.
      if (err) reject(err);
      resolve();
    }
  )
);

Of course there has always been ways to apply this with either callbacks or plain Promise implementations, but it's the "sugar" of async/await than actually makes this look much cleaner.

NodeJS v10.x and MongoDB Node driver 3.1.x and up

And the favorite version uses AsyncIterator which is now enabled in NodeJS v10 and upwards. It's a much cleaner way to iterate

async function dbanalyze(){

  let cursor = db.collection('randomcollection').find()
  for await ( let doc of cursor ) {
    // do something with the current document
  }    
}

Which "in a way" comes back to what the question originally asked as to using a for loop since we can do the for-await-of syntax here fore supporting iterable which supports the correct interface. And the Cursor does support this interface.


If you're curios, here's a listing I cooked up some time ago to demonstrate various cursor iteration techniques. It even includes a case for Async Iterators from a generator function:

const Async = require('async'),
      { MongoClient, Cursor } = require('mongodb');

const testLen = 3;
(async function() {

  let db;

  try {
    let client = await MongoClient.connect('mongodb://localhost/');

    let db = client.db('test');
    let collection = db.collection('cursortest');

    await collection.remove();

    await collection.insertMany(
      Array(testLen).fill(1).map((e,i) => ({ i }))
    );

    // Cursor.forEach
    console.log('Cursor.forEach');
    await new Promise((resolve,reject) => {
      collection.find().forEach(
        console.log,
        err => {
          if (err) reject(err);
          resolve();
        }
      );
    });

    // Async.during awaits cursor.hasNext()
    console.log('Async.during');
    await new Promise((resolve,reject) => {

      let cursor = collection.find();

      Async.during(
        (callback) => Async.nextTick(() => cursor.hasNext(callback)),
        (callback) => {
          cursor.next((err,doc) => {
            if (err) callback(err);
            console.log(doc);
            callback();
          })
        },
        (err) => {
          if (err) reject(err);
          resolve();
        }
      );

    });

    // async/await allows while loop
    console.log('async/await while');
    await (async function() {

      let cursor = collection.find();

      while( await cursor.hasNext() ) {
        let doc = await cursor.next();
        console.log(doc);
      }

    })();

    // await event stream
    console.log('Event Stream');
    await new Promise((end,error) => {
      let cursor = collection.find();

      for ( let [k,v] of Object.entries({ end, error, data: console.log }) )
        cursor.on(k,v);
    });

    // Promise recursion
    console.log('Promise recursion');
    await (async function() {

      let cursor = collection.find();

      function iterate(cursor) {
        return cursor.hasNext().then( bool =>
          (bool) ? cursor.next().then( doc => {
            console.log(doc);
            return iterate(cursor);
          }) : Promise.resolve()
        )
      }

      await iterate(cursor);

    })();

    // Uncomment if node is run with async iteration enabled
    // --harmony_async_iteration


    console.log('Generator Async Iterator');
    await (async function() {

      async function* cursorAsyncIterator() {
        let cursor = collection.find();

        while (await cursor.hasNext() ) {
          yield cursor.next();
        }

      }

      for await (let doc of cursorAsyncIterator()) {
        console.log(doc);
      }

    })();


    // This is supported with Node v10.x and the 3.1 Series Driver
    await (async function() {

      for await (let doc of collection.find()) {
        console.log(doc);
      }

    })();

    client.close();

  } catch(e) {
    console.error(e);
  } finally {
    process.exit();
  }

})();
like image 58
Neil Lunn Avatar answered Oct 26 '22 23:10

Neil Lunn