Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Node.js: can you use asynchronous functions from within streams?

Consider the following:

var asyncFunction = function(data, callback) {   doAsyncyThing(function(data){     // do some stuff     return callback(err)   }) } fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')   .pipe(JSONstream.parse())   .on('data', asyncFunction)   // <- how to let asyncFunction complete before continuing 

How does the stream know when asyncFunction has completed? Is there any way to use asynchronous functions from within streams?

like image 866
Fergie Avatar asked Apr 16 '16 08:04

Fergie


People also ask

Are node streams asynchronous?

Introduction. A stream is an abstraction of data in programming. The Node. js Stream API has been around for a long time and is used as a uniform API for reading and writing asynchronous data.

How does asynchronous work in node JS?

Node. js uses callbacks, being an asynchronous platform, it does not wait around like database query, file I/O to complete. The callback function is called at the completion of a given task; this prevents any blocking, and allows other code to be run in the meantime.

Is everything asynchronous in node JS?

Node. js is a Javascript runtime and it is asynchronous in nature(through event loops). While Asynchronous programming comes with various features like faster execution of programs, it comes with a cost too i.e. usually it is a little bit difficult to program when compare to Synchronous programming.


1 Answers

Check out transform streams. They give you the ability to run async code on a chunk, and then call a callback when you are finished. Here are the docs: https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback

As a simple example, you can do something like:

const Transform = require('stream').Transform class WorkerThing extends Transform {     _transform(chunk, encoding, cb) {         asyncFunction(chunk, cb)     } }  const workerThing = new WorkerThing()  fs.createReadStream('eupmc_lite_metadata_2016_04_15.json') .pipe(JSONstream.parse()) .pipe(workerThing) 
like image 65
Ryan Quinn Avatar answered Oct 25 '22 20:10

Ryan Quinn