Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a FIFO stream in Scala?

I'm looking for a FIFO stream in Scala, i.e., something that provides the functionality of

  • immutable.Stream (a stream that can be finite and memorizes the elements that have already been read)
  • mutable.Queue (which allows for added elements to the FIFO)

The stream should be closable and should block access to the next element until the element has been added or the stream has been closed.

Actually I'm a bit surprised that the collection library does not (seem to) include such a data structure, since it is IMO a quite classical one.

My questions:

  • 1) Did I overlook something? Is there already a class providing this functionality?

  • 2) OK, if it's not included in the collection library then it might by just a trivial combination of existing collection classes. However, I tried to find this trivial code but my implementation looks still quite complex for such a simple problem. Is there a simpler solution for such a FifoStream?

    class FifoStream[T] extends Closeable {
    
    val queue = new Queue[Option[T]]
    
    lazy val stream = nextStreamElem
    
    private def nextStreamElem: Stream[T] = next() match {
        case Some(elem) => Stream.cons(elem, nextStreamElem)
        case None       => Stream.empty
    }
    
    /** Returns next element in the queue (may wait for it to be inserted). */
    private def next() = {
        queue.synchronized {
            if (queue.isEmpty) queue.wait()
            queue.dequeue()
        }
    }
    
    /** Adds new elements to this stream. */
    def enqueue(elems: T*) {
        queue.synchronized {
            queue.enqueue(elems.map{Some(_)}: _*)
            queue.notify()
        }
    }
    
    /** Closes this stream. */
    def close() {
        queue.synchronized {
            queue.enqueue(None)
            queue.notify()
        }
    }
    }
    

Paradigmatic's solution (sightly modified)

Thanks for your suggestions. I slightly modified paradigmatic's solution so that toStream returns an immutable stream (allows for repeatable reads) so that it fits my needs. Just for completeness, here is the code:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
like image 206
Stefan Endrullis Avatar asked Sep 26 '11 09:09

Stefan Endrullis


2 Answers

In Scala, streams are "functional iterators". People expect them to be pure (no side effects) and immutable. In you case, everytime you iterate on the stream you modify the queue (so it's no pure). This can create a lot of misunderstandings, because iterating twice the same stream, will have two different results.

That being said, you should rather use Java BlockingQueues, rather than rolling your own implementation. They are considered well implemented in term of safety and performances. Here is the cleanest code I can think of (using your approach):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}
like image 80
paradigmatic Avatar answered Nov 15 '22 20:11

paradigmatic


I'm assuming you're looking for something like java.util.concurrent.BlockingQueue?

Akka has a BoundedBlockingQueue implementation of this interface. There are of course the implementations available in java.util.concurrent.

You might also consider using Akka's actors for whatever it is you are doing. Use Actors to be notified or pushed a new event or message instead of pulling.

like image 24
drstevens Avatar answered Nov 15 '22 21:11

drstevens