Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to asynchronously createReadStream in node.js with async/await

I am having difficulty with using fs.creadReadStream to process my csv file asynchronously:

async function processData(row) {
    // perform some asynchronous function
    await someAsynchronousFunction();
}

fs.createReadStream('/file')
    .pipe(parse({
        delimiter: ',',
        columns: true
    })).on('data', async (row) => {
        await processData(row);
    }).on('end', () => {
        console.log('done processing!')
    })

I want to perform some asynchronous function after reading each record one by one before the createReadStream reaches on('end').

However, the on('end') gets hit before all of my data finishes processing. Does anyone know what I might be doing wrong?

Thanks in advance!

like image 387
John Grayson Avatar asked Dec 11 '22 06:12

John Grayson


1 Answers

.on('data, ...) does not wait for your await. Remember, an async function returns a promise immediately and .on() is not paying any attention to that promise so it just keeps merrily going on.

The await only waits inside the function, it does not stop your function from returning immediately and thus the stream thinks you've process the data and keeps sending more data and generating more data events.

There are several possible approaches here, but the simplest might be to pause the stream until processData() is done and then restart the stream.

Also, does processData() return a promise that is linked to the completion of the async operation? That is also required for await to be able to do its job.

The readable stream doc contains an example of pausing the stream during a data event and then resuming it after some asynchronous operation finishes. Here's their example:

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});
like image 188
jfriend00 Avatar answered May 21 '23 18:05

jfriend00