I have a simple program:
import scalaz._
import stream._
object Play extends App {
val in1 = io.linesR("C:/tmp/as.txt")
val in2 = io.linesR("C:/tmp/bs.txt")
val p = (in1 merge in2) to io.stdOutLines
p.run.run
}
The file as.txt
contains five a
s and the file bs.txt
contain 3 b
s. I see this sort of output:
a
b
b
a
a
b
a
a
a
However, when I change the declaration of in2
as follows:
val in2 = io.stdInLines
Then I get what I think is unexpected behaviour. According to the documentation 1, the program should pull data non-deterministically from each stream according to whichever stream is quicker to supply stuff. This should mean that I see a bunch of a
s immediately printed to the console but this is not what happens at all.
Indeed, until I press ENTER
, nothing happens. It's quite clear that the behaviour looks a lot like what I would expect if I was choosing a stream at random to get the next element from and then, if that stream was blocking, the merged process blocks too (even if the other stream contains data).
What is going on?
1 - well, OK, there is very little documentation, but Dan Spiewak said very clearly in his talk that it would grab whoever was the first to supply data
The problem is in the implementation of stdInLines
. It is blocking, it never Task.fork
s a thread.
Try changing the implentation of stdInLines
to this one:
def stdInLines: Process[Task,String] =
Process.repeatEval(Task.apply {
Option(scala.Console.readLine())
.getOrElse(throw Cause.Terminated(Cause.End))
})
The original io.stdInLines
is running the readLine()
in the same thread, so it always waits there until you type something.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With