If some entities in a system can function as producers of data or events, and other entities can function as consumers, does it make sense to externalize these "orthogonal concerns" into Producer and Consumer typeclasses?
I can see that the Haskell Pipes library uses this approach, and appreciate this question may look pretty basic for people coming from a Haskell background, but would be interested in a Scala perspective and examples because I don't see a lot.
You should look into this article by Matt Might.
It gives you a simple implementation of Producer
, Consumer
, Transducer
(Pipe in the haskell library you mentioned) and an example on how to use them to create a web server.
Basically each Producer
extends Runnable
and have a private buffer to output elements. The buffer is a java ArrayBlockingQueue
which is thread-safe.
Each Consumer
is also a Runnable
and has an input buffer using a similar architecture.
When you chain a Consumer
to a Producer
, you create another Runnable
.
On start it will start the Producer
and the Consumer
(they are Runnable) and will transfer the data between them.
When you chain a Transducer
to a Producer
it creates a new Producer
.
So if you follow his implementation you should be able to write in the fashion of haskell:
listen ==> connect ==> process ==> reply
Here is some code copied and improved from the link above:
import java.util.concurrent.ArrayBlockingQueue
trait Coroutine extends Runnable {
def start() {
val myThread = new Thread(this)
myThread.start()
}
}
trait Producer[O] extends Coroutine {
private val outputs = new ArrayBlockingQueue[O](1024)
protected def put(output: O): Unit = outputs.put(output)
def next(): O = outputs.take()
def ==>[I >: O](consumer: Consumer[I]): Coroutine = {
val that = this
new Coroutine {
def run() {
while (true) { val o = that.next(); consumer.accept(o) }
}
override def start() {
that.start()
consumer.start()
super.start()
}
}
}
}
trait Consumer[I] extends Coroutine {
private val inputs = new ArrayBlockingQueue[I] (1024)
def accept(input : I): Unit = inputs.put(input)
protected def get(): I = inputs.take()
}
And here is how you could use it:
case class IntProducer(zero: Int) extends Producer[Int]{
def run(): Unit = {
var i = zero
while(true) { put(i); i += 1 }
}
}
object Printer extends Consumer[Any]{
def run(): Unit = {
while(true) { println(get()) }
}
}
val pip = IntProducer(0) ==> Printer
pip.start()
To see more example and how to handle the `Transducer̀ look at my Gist.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With