Is functional programming in node.js general enough? can it be used to do a real-world problem of handling small bulks of db records without loading all records in memory using toArray
(thus going out of memory). You can read this criticism for background. We want to demonstrate Mux and DeMux and fork/tee/join capabilities of such node.js libraries with async generators.
I'm questioning the validity and generality of functional programming in node.js using any functional programming tool (like ramda, lodash, and imlazy) or even custom.
Millions of records from a MongoDB cursor that can be iterated using await cursor.next()
You might want to read more about async generators
and for-await-of
.
For fake data one can use (on node 10)
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function* getDocs(n) {
for(let i=0;i<n;++i) {
await sleep(1);
yield {i: i, t: Date.now()};
}
}
let docs=getDocs(1000000);
We need
Make sure that first and last documents are included in the batches and not consumed.
The millions of records should not be loaded into ram, one should iterate on them and hold at most only a batch of them.
The requirement can be done using usual nodejs code, but can it be done using something like applyspec as in here.
R.applySpec({
first: R.head(),
last: R.last(),
_:
R.pipe(
R.splitEvery(n),
R.map( (i)=> {return "emit "+JSON.stringify(i);})
)
})(input)
let promise = new Promise(function(resolve, reject) { setTimeout(() => resolve({msg: 'To do some more job'}), 1000); }); promise. then(function(result) { return {data: 'some data'}; }); promise. then(function(result) { return {data: 'some other data'}; }); promise.
A promise is basically an advancement of callbacks in Node. In other words, a promise is a JavaScript object which is used to handle all the asynchronous data operations. While developing an application you may encounter that you are using a lot of nested callback functions.
Generators are function executions that can be suspended and resumed at a later point. Generators are useful when carrying out concepts such as 'lazy execution'. This basically means that by suspending execution and resuming at will, we are able to pull values only when we need to.
Promises are a Functional Programming (FP) concept — though in that area they go by the much-feared name of ”monads”! In this article, we'll see how applied FP to solve a programming challenge achieving an elegant, short solution, through promises and higher-order functions.
To show how this could be modeled with vanilla JS, we can introduce the idea of folding over an async generator that produces things that can be combined together.
const foldAsyncGen = (of, concat, empty) => (step, fin) => async asyncGen => {
let acc = empty
for await (const x of asyncGen) {
acc = await step(concat(acc, of(x)))
}
return await fin(acc)
}
Here the arguments are broken up into three parts:
(of, concat, empty)
expects a function to produce "combinable" thing, a function that will combine two "combinable" things and an empty/initial instance of a "combinable" thing(step, fin)
expects a function that will take a "combinable" thing at each step and produce a Promise
of a "combinable" thing to be used for the next step and a function that will take the final "combinable" thing after the generator has exhausted and produce a Promise
of the final resultasync asyncGen
is the async generator to processIn FP, the idea of a "combinable" thing is known as a Monoid, which defines some laws that detail the expected behaviour of combining two of them together.
We can then create a Monoid that will be used to carry through the first, last and batch of values when stepping through the generator.
const Accum = (first, last, batch) => ({
first,
last,
batch,
})
Accum.empty = Accum(null, null, []) // an initial instance of `Accum`
Accum.of = x => Accum(x, x, [x]) // an `Accum` instance of a single value
Accum.concat = (a, b) => // how to combine two `Accum` instances together
Accum(a.first == null ? b.first : a.first, b.last, a.batch.concat(b.batch))
To capture the idea of flushing the accumulating batches we can create another function that takes an onFlush
function that will perform some action in a returned Promise
with the values being flushed, and a size n
of when to flush the batch.
Accum.flush = onFlush => n => acc =>
acc.batch.length < n ? Promise.resolve(acc)
: onFlush(acc.batch.slice(0, n))
.then(_ => Accum(acc.first, acc.last, acc.batch.slice(n)))
We can also now define how we can fold over the Accum
instances.
Accum.foldAsyncGen = foldAsyncGen(Accum.of, Accum.concat, Accum.empty)
With the above utilities defined, we can now use them to model your specific problem.
const emit = batch => // This is an analog of where you would emit your batches
new Promise((resolve) => resolve(console.log(batch)))
const flushEmit = Accum.flush(emit)
// flush and emit every 10 items, and also the remaining batch when finished
const fold = Accum.foldAsyncGen(flushEmit(10), flushEmit(0))
And finally run with your example.
fold(getDocs(100))
.then(({ first, last })=> console.log('done', first, last))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With