Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Modelling producer-consumer semantics with typeclasses?

If some entities in a system can function as producers of data or events, and other entities can function as consumers, does it make sense to externalize these "orthogonal concerns" into Producer and Consumer typeclasses?

I can see that the Haskell Pipes library uses this approach, and appreciate this question may look pretty basic for people coming from a Haskell background, but would be interested in a Scala perspective and examples because I don't see a lot.

like image 760
James McCabe Avatar asked Nov 01 '22 18:11

James McCabe


1 Answers

You should look into this article by Matt Might.

It gives you a simple implementation of Producer, Consumer, Transducer (Pipe in the haskell library you mentioned) and an example on how to use them to create a web server.

Basically each Producer extends Runnable and have a private buffer to output elements. The buffer is a java ArrayBlockingQueue which is thread-safe.

Each Consumer is also a Runnable and has an input buffer using a similar architecture.

When you chain a Consumer to a Producer, you create another Runnable. On start it will start the Producer and the Consumer (they are Runnable) and will transfer the data between them.

When you chain a Transducer to a Producer it creates a new Producer.

So if you follow his implementation you should be able to write in the fashion of haskell:

listen ==> connect ==> process ==> reply

Here is some code copied and improved from the link above:

import java.util.concurrent.ArrayBlockingQueue

trait Coroutine extends Runnable {
    def start() {
        val myThread = new Thread(this)
        myThread.start()
    }
}

trait Producer[O] extends Coroutine {
     private val outputs = new ArrayBlockingQueue[O](1024)
     protected def put(output: O): Unit = outputs.put(output)
     def next(): O = outputs.take()

     def ==>[I >: O](consumer: Consumer[I]): Coroutine = {
         val that = this
         new Coroutine {
             def run() {
                 while (true) { val o = that.next(); consumer.accept(o) }
             }

             override def start() {
                 that.start()
                 consumer.start()
                 super.start()
             }
         }
     }
}

trait Consumer[I] extends Coroutine {
    private val inputs = new ArrayBlockingQueue[I] (1024)
    def accept(input : I): Unit = inputs.put(input)
    protected def get(): I = inputs.take()
}

And here is how you could use it:

case class IntProducer(zero: Int) extends Producer[Int]{
    def run(): Unit = {
         var i = zero
         while(true) { put(i); i += 1 }
    }
}

object Printer extends Consumer[Any]{
    def run(): Unit = {
         while(true) { println(get()) }
    }
}

val pip = IntProducer(0) ==> Printer
pip.start()

To see more example and how to handle the `Transducer̀ look at my Gist.

like image 147
gwenzek Avatar answered Nov 15 '22 07:11

gwenzek