I am having difficulty with using fs.creadReadStream
to process my csv file asynchronously:
async function processData(row) {
// perform some asynchronous function
await someAsynchronousFunction();
}
fs.createReadStream('/file')
.pipe(parse({
delimiter: ',',
columns: true
})).on('data', async (row) => {
await processData(row);
}).on('end', () => {
console.log('done processing!')
})
I want to perform some asynchronous function after reading each record one by one before the createReadStream
reaches on('end')
.
However, the on('end')
gets hit before all of my data finishes processing. Does anyone know what I might be doing wrong?
Thanks in advance!
.on('data, ...)
does not wait for your await
. Remember, an async
function returns a promise immediately and .on()
is not paying any attention to that promise so it just keeps merrily going on.
The await
only waits inside the function, it does not stop your function from returning immediately and thus the stream thinks you've process the data and keeps sending more data and generating more data
events.
There are several possible approaches here, but the simplest might be to pause the stream until processData()
is done and then restart the stream.
Also, does processData()
return a promise that is linked to the completion of the async operation? That is also required for await
to be able to do its job.
The readable stream doc contains an example of pausing the stream during a data
event and then resuming it after some asynchronous operation finishes. Here's their example:
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With