I'm looking to build a pipeline pattern with Scala. I wish after I write the pipeline objects, they could be connected together like this:
Pipeline1 :: Pipeline2 :: Pipeline3 ...
I have experimented with a few ideas so far. Some work and some don't. But none of them seems to completely get rid of boilerplate code. The following is the closest I've got.
First define the Pipeline and Source abstract class:
// I is the input type and O is the output type of the pipeline
abstract class Pipeline[I, +O](p: Pipeline[_, _ <: I]) {
val source = p
val name: String
def produce(): O
def stats():String
}
abstract class Source[+T] extends Pipeline[AnyRef, T](null)
Next, I created two pipelines and try to link them together
// this creates a random integer
class RandomInteger extends Source[Int] {
override val name = "randInt"
def produce() = {
scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10)
}
def stats()="this pipeline is stateless"
}
// multiply it by ten
class TimesTen(p: Pipeline[_, Int]) extends Pipeline[Int, Int](p) {
private var count = 0 // this is a simple state of the pipeline
override val name = "Times"
def produce = {
val i = source.produce()
count += 1 // updating the state
i * 10
}
def stats() = "this pipeline has been called for " + count + " times"
}
object TimesTen {
// this code achieves the desired connection using ::
// but this has to be repeated in each pipeline subclass.
// how to remove or abstract away this boilerplate code?
def ::(that: Pipeline[_, Int]) = new TimesTen(that)
}
This is the main class where two pipelines are linked.
object Pipeline {
def main(args: Array[String]) {
val p = new RandomInteger() :: TimesTen
println(p.source)
for (i <- 0 to 10)
println(p.produce())
println(p.stats())
}
}
So this code works. But I would have to repeat the code in the TimesTen companion object in every pipeline class I write. This is certainly not desirable. Is there any better way to do this? Reflection might work, but I heard bad things about it, such as anything involving reflection is bad design. I'm also unsure about Scala's support for reflection.
Thank you for your time.
Update: I designed this toy problem to make it easy to understand. As a general solution, and as my application requires, each pipeline object has a state, which is ideally encapsulated within the object itself rather than exposed to every other pipeline. I have modified the code above to reflect this. I wish there could be an object-based solution. I'm still experimenting and will let you know if I find one.
Update 2: After some thoughts, I think the idea of the pipeline is really just a generalized function that contains some internal states as well as the ability to compose a Function0
function with a Function1
function. In Scala, the Function0
class does not have the compose()
or andThen()
method.
Build pipelines are a common pattern, where you have files and assets you want to process but want to do so efficiently and incrementally. Usually that means only re-processing files when they change, and otherwise re-using the already-processed assets as much as possible.
The pipeline pattern is a software design pattern that provides the ability to build and execute a sequence of operations.
The term Pipelining refers to a technique of decomposing a sequential process into sub-operations, with each sub-operation being executed in a dedicated segment that operates concurrently with all other segments.
Here is the solution with objects using andThen
. The idea is to force the creation of Function1
objects by using the input Unit
. Connecting two Pipelines creates a new Pipeline with the two functions together. This solution allows Pipelines to have internal states.
A further simplification would be to use apply()
instead of produce()
. This is left as an exercise for the reader.
abstract class Pipeline[-I, +O] {
val name: String
def produce : I => O
def stats(): String
def ->[X](seg:Pipeline[_ >: O, X]):Pipeline[I, X] = {
val func = this.produce
val outerName = this.name
new Pipeline[I, X] {
val name = outerName + "." + seg.name
def produce = func andThen seg.produce
def stats = seg.stats
}
}
}
abstract class Source[+T] extends Pipeline[Unit, T] {
}
class RandomInteger extends Source[Int] {
override val name = "randInt"
def produce: Unit => Int = (x:Unit) => scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10)
def stats() = "stateless"
}
class TimesTen() extends Pipeline[Int, Int] {
private var count = 0
override val name = "times"
def produce : Int => Int = (x:Int) => {
count += 1
x * 10
}
def stats() = "called for " + count + " times"
}
object Main {
def main(args: Array[String]) {
val p = new RandomInteger() -> new TimesTen()
for (i <- 0 to 10)
println(p.produce())
println(p.name) // print "randInt.times"
println(p.stats()) // print "called for 11 times"
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With