Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to elegantly implement the pipeline pattern using Scala

Tags:

scala

pipeline

I'm looking to build a pipeline pattern with Scala. I wish after I write the pipeline objects, they could be connected together like this:

Pipeline1 :: Pipeline2 :: Pipeline3 ...

I have experimented with a few ideas so far. Some work and some don't. But none of them seems to completely get rid of boilerplate code. The following is the closest I've got.

First define the Pipeline and Source abstract class:

// I is the input type and O is the output type of the pipeline
abstract class Pipeline[I, +O](p: Pipeline[_, _ <: I]) {

  val source = p
  val name: String
  def produce(): O
  def stats():String
}
abstract class Source[+T] extends Pipeline[AnyRef, T](null)

Next, I created two pipelines and try to link them together

// this creates a random integer
class RandomInteger extends Source[Int] {
  override val name = "randInt"

  def produce() = {
    scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10)
  }

  def stats()="this pipeline is stateless"
}

// multiply it by ten
class TimesTen(p: Pipeline[_, Int]) extends Pipeline[Int, Int](p) {
  private var count = 0 // this is a simple state of the pipeline
  override val name = "Times"
  def produce = {
    val i = source.produce()
    count += 1 // updating the state
    i * 10
  }
  def stats() = "this pipeline has been called for " + count + " times"
}

object TimesTen {
  // this code achieves the desired connection using ::
  // but this has to be repeated in each pipeline subclass. 
  // how to remove or abstract away this boilerplate code? 
  def ::(that: Pipeline[_, Int]) = new TimesTen(that)
}

This is the main class where two pipelines are linked.

object Pipeline {
  def main(args: Array[String]) {
    val p = new RandomInteger() :: TimesTen
    println(p.source)
    for (i <- 0 to 10)
      println(p.produce())
    println(p.stats())
  }
}

So this code works. But I would have to repeat the code in the TimesTen companion object in every pipeline class I write. This is certainly not desirable. Is there any better way to do this? Reflection might work, but I heard bad things about it, such as anything involving reflection is bad design. I'm also unsure about Scala's support for reflection.

Thank you for your time.

Update: I designed this toy problem to make it easy to understand. As a general solution, and as my application requires, each pipeline object has a state, which is ideally encapsulated within the object itself rather than exposed to every other pipeline. I have modified the code above to reflect this. I wish there could be an object-based solution. I'm still experimenting and will let you know if I find one.

Update 2: After some thoughts, I think the idea of the pipeline is really just a generalized function that contains some internal states as well as the ability to compose a Function0 function with a Function1 function. In Scala, the Function0 class does not have the compose() or andThen() method.

like image 495
Albert Li Avatar asked Mar 07 '12 01:03

Albert Li


People also ask

What is pipeline in Scala?

Build pipelines are a common pattern, where you have files and assets you want to process but want to do so efficiently and incrementally. Usually that means only re-processing files when they change, and otherwise re-using the already-processed assets as much as possible.

Is pipeline a design pattern?

The pipeline pattern is a software design pattern that provides the ability to build and execute a sequence of operations.

What are pipelined methods in Java?

The term Pipelining refers to a technique of decomposing a sequential process into sub-operations, with each sub-operation being executed in a dedicated segment that operates concurrently with all other segments.


1 Answers

Here is the solution with objects using andThen. The idea is to force the creation of Function1 objects by using the input Unit. Connecting two Pipelines creates a new Pipeline with the two functions together. This solution allows Pipelines to have internal states.

A further simplification would be to use apply() instead of produce(). This is left as an exercise for the reader.

abstract class Pipeline[-I, +O] {

  val name: String
  def produce : I => O
  def stats(): String

  def ->[X](seg:Pipeline[_ >: O, X]):Pipeline[I, X] = {
    val func = this.produce
    val outerName = this.name
    new Pipeline[I, X] {
      val name = outerName + "." + seg.name
      def produce = func andThen seg.produce 
      def stats = seg.stats
    }
  }
}

abstract class Source[+T] extends Pipeline[Unit, T] {
}

class RandomInteger extends Source[Int] {
  override val name = "randInt"
  def produce: Unit => Int = (x:Unit) => scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10) 
  def stats() = "stateless"
}

class TimesTen() extends Pipeline[Int, Int] {
  private var count = 0
  override val name = "times"
  def produce : Int => Int = (x:Int) => {    
    count += 1
    x * 10
  }
  def stats() = "called for " + count + " times"
}


object Main {
  def main(args: Array[String]) {
    val p = new RandomInteger() -> new TimesTen() 

    for (i <- 0 to 10)
      println(p.produce())
    println(p.name)    // print "randInt.times"
    println(p.stats()) // print "called for 11 times"
  }
}
like image 138
Albert Li Avatar answered Sep 28 '22 07:09

Albert Li