Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Circular data flow in highlandjs

I'm just learning highland.js after being inspired by NoFlo.js. I want to be able to have streams operate recursively. In this contrived example I will provide a number that get's multiplied by two and we filter results <= 512. Once the number is multiplied it gets fed back into the system. The code I have works but if I take out the doto function in the pipeline it doesn't process any numbers. I suspect that I'm sending the data back into the returnPipe incorrectly. Is there a better way to pipe data back into a system? What am I missing?

###
  input>--m--->multiplyBy2>---+
          |                   |
          |                   |
          +---<returnPipe<----+
###

H = require('highland')

input = H([1])
returnPipe = H.pipeline(
  H.doto((v)->console.log(v))
)
H.merge([input,returnPipe])
 .map((v)-> return v * 2)
 .filter((v)-> return v <= 512)
 .pipe(returnPipe)
like image 481
Michael Connor Avatar asked Aug 24 '15 21:08

Michael Connor


2 Answers

From the documentation: doto spins off a stream while re-emitting the source stream. This means that as far as the pipeline is concerned, there is a function that is still passing the stream through it. If you take doto out, the original stream doesn't make it back through return stream on the next iteration.

If you are going to use pipeline, you have to pass it a method that takes a stream and emits a stream. For example, you could replace the doto method with something like H.map((v)=>{console.log(v); return v;}) in the call to H.pipeline and since that method consumes a stream and emits a stream, it will continue to flow when the stream is passed back into it on .pipe(returnPipe)

EDIT: To answer your question, when you declare let input = H([1]) you are actually creating a stream right there. You can remove any reference to the pipeline and returnPipe and produce the same output with the following code:

let input = H([1]);

input.map((v)=> {
  return v * 2;
})
.filter((v)=> {
  if (v <= 512) {
    console.log(v);
  }
  return v <= 512;
})
.pipe(input);
like image 146
jaredkwright Avatar answered Nov 15 '22 06:11

jaredkwright


My original intent was to write a recursive file reader in highland.js. I posted to the highland.js github issues list and Victor Vu helped me put this together with a fantastic write-up.

H = require('highland')
fs = require('fs')
fsPath = require('path')

###
  directory >---m----------> dirFilesStream >-------------f----> out
                |                                         |
                |                                         |
                +-------------< returnPipe <--------------+

  legend: (m)erge  (f)ork

 + directory         has the initial file
 + dirListStream     does a directory listing
 + out               prints out the full path of the file
 + directoryFilter   runs stat and filters on directories
 + returnPipe        the only way i can

###

directory = H(['someDirectory'])
mergePoint = H()
dirFilesStream = mergePoint.merge().flatMap((parentPath) ->
  H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) ->
    fsPath.join parentPath, path
)
out = dirFilesStream
# Create the return pipe without using pipe!
returnPipe = dirFilesStream.observe().flatFilter((path) ->
  H.wrapCallback(fs.stat)(path).map (v) ->
    v.isDirectory()
)
# Connect up the merge point now that we have all of our streams.
mergePoint.write directory
mergePoint.write returnPipe
mergePoint.end()
# Release backpressure.
out.each H.log
like image 44
Michael Connor Avatar answered Nov 15 '22 04:11

Michael Connor