Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

generator/block to iterator/stream conversion

Basically I want to convert this:

def data(block: T => Unit)

to a Stream (dataToStream is a hypothetical function that do this conversion):

val dataStream: Stream[T] = dataToStream(data)

I suppose this problem could be resolved by continuations:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

Thanks, Dawid

like image 722
Dawid Grzesiak Avatar asked Sep 26 '10 12:09

Dawid Grzesiak


2 Answers

EDITED: Modified the examples to show the laziness of traversable.view

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

The toTraversable method will convert your data function into a Traversable collection. By itself, it's nothing huge, but you can convert this to a TraversableView which is lazy. Here's an example:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

The unfortunate nature of the take method is that it must go one past the last value generated to work correctly, but it will terminate early. The above code would look the same without the ".view" call. However, here's a more compelling example:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

So in conclusion, I believe the collection you're looking for is TraversableView, which is easiest to create view making a Traversable and then calling "view" on it. If you really wanted the Stream type, here's a method that works in 2.8.0.final and will make a "Stream" without threads:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

The unfortunate nature of this method is that it will iterate over the entire traversable before making the stream. This also means all the values need to be buffered in memory. The only alternative is to resort to threads.

As an aside: This was the motivating reason to prefer Traversables as direct returns from scalax.io.File methods: "lines" "chars" and "bytes".

like image 128
jsuereth Avatar answered Sep 30 '22 03:09

jsuereth


Here's a simple solution that spawns a thread that consumes the data. It posts the data to a SynchronousQueue. A stream the pulls data from the queue is created and returned:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   
like image 32
Geoff Reedy Avatar answered Sep 30 '22 02:09

Geoff Reedy