I'm just learning highland.js after being inspired by NoFlo.js. I want to be able to have streams operate recursively. In this contrived example I will provide a number that get's multiplied by two and we filter results <= 512. Once the number is multiplied it gets fed back into the system. The code I have works but if I take out the doto function in the pipeline it doesn't process any numbers. I suspect that I'm sending the data back into the returnPipe incorrectly. Is there a better way to pipe data back into a system? What am I missing?
###
input>--m--->multiplyBy2>---+
| |
| |
+---<returnPipe<----+
###
H = require('highland')
input = H([1])
returnPipe = H.pipeline(
H.doto((v)->console.log(v))
)
H.merge([input,returnPipe])
.map((v)-> return v * 2)
.filter((v)-> return v <= 512)
.pipe(returnPipe)
From the documentation: doto
spins off a stream while re-emitting the source stream. This means that as far as the pipeline is concerned, there is a function that is still passing the stream through it. If you take doto
out, the original stream doesn't make it back through return stream on the next iteration.
If you are going to use pipeline, you have to pass it a method that takes a stream and emits a stream. For example, you could replace the doto
method with something like H.map((v)=>{console.log(v); return v;})
in the call to H.pipeline
and since that method consumes a stream and emits a stream, it will continue to flow when the stream is passed back into it on .pipe(returnPipe)
EDIT: To answer your question, when you declare let input = H([1])
you are actually creating a stream right there. You can remove any reference to the pipeline and returnPipe and produce the same output with the following code:
let input = H([1]);
input.map((v)=> {
return v * 2;
})
.filter((v)=> {
if (v <= 512) {
console.log(v);
}
return v <= 512;
})
.pipe(input);
My original intent was to write a recursive file reader in highland.js. I posted to the highland.js github issues list and Victor Vu helped me put this together with a fantastic write-up.
H = require('highland')
fs = require('fs')
fsPath = require('path')
###
directory >---m----------> dirFilesStream >-------------f----> out
| |
| |
+-------------< returnPipe <--------------+
legend: (m)erge (f)ork
+ directory has the initial file
+ dirListStream does a directory listing
+ out prints out the full path of the file
+ directoryFilter runs stat and filters on directories
+ returnPipe the only way i can
###
directory = H(['someDirectory'])
mergePoint = H()
dirFilesStream = mergePoint.merge().flatMap((parentPath) ->
H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) ->
fsPath.join parentPath, path
)
out = dirFilesStream
# Create the return pipe without using pipe!
returnPipe = dirFilesStream.observe().flatFilter((path) ->
H.wrapCallback(fs.stat)(path).map (v) ->
v.isDirectory()
)
# Connect up the merge point now that we have all of our streams.
mergePoint.write directory
mergePoint.write returnPipe
mergePoint.end()
# Release backpressure.
out.each H.log
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With