Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Prevent more IO with multiple pipelines on the same RDD

Tags:

apache-spark

E.g. if I run over the same RDD of numbers where one flow filters for the even numbers and averages them and the other filters for the odd and sums them. If I write this as two pipelines over the same RDD this will create two executions, that will scan the RDD twice, which can be expensive in terms of IO.

How can this IO be reduced to only read the data once without rewriting the logic to be in one pipeline? A framework that takes two pipelines and merges them to one is OK of course, just as long as developers continue to work on each pipeline independently (in the real case, these pipelines are loaded from separate modules)

The point is not to use cache() to achieve this

like image 935
IttayD Avatar asked May 24 '16 12:05

IttayD


1 Answers

Since your question is rather vague let's think about general strategies that can be used to approach this problem.

A standard solution here would be caching, but since you explicitly want to avoid it, I assume there some additional limitations here. It suggests that some similar solutions, like

  • in memory data storage (like Ignite suggested by heenenee)
  • accelerated storage like Alluxio

are not acceptable either. It means you have to find some to manipulate pipeline itself.

Although multiple transformations can be squashed together every transformation creates a new RDD. This, combined with your statement about caching, sets relatively strong constraints on possible solutions.

Let's start with the simplest possible case where all pipelines can be expressed a single stage jobs. This restricts our choices to map only jobs and simple map-reduce jobs (like the one described in your question). Pipelines like this can be easily expressed as a sequence of operations on local iterators. So the following

import org.apache.spark.util.StatCounter

def isEven(x: Long) = x % 2 == 0
def isOdd(x: Long) = !isEven(x)

def p1(rdd: RDD[Long]) = {
  rdd
    .filter(isEven _)
    .aggregate(StatCounter())(_ merge _, _ merge _)
    .mean
}

def p2(rdd: RDD[Long]) = {
  rdd
    .filter(isOdd _)
    .reduce(_ + _)
}

could be expressed as:

def p1(rdd: RDD[Long]) = {
  rdd
    .mapPartitions(iter => 
      Iterator(iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)))
    .collect
    .reduce(_ merge _)
    .mean
}


def p2(rdd: RDD[Long]) = {
  rdd
    .mapPartitions(iter => 
      Iterator(iter.filter(isOdd _).foldLeft(0L)(_ + _)))
    .collect
    .reduce(_ + _)
    // identity _
}

At this point we can rewrite separate jobs as follows:

def mapPartitions2[T, U, V](rdd: RDD[T])(f: Iterator[T] => U, g: Iterator[T] => V)  = {
  rdd.mapPartitions(iter => {
    val items = iter.toList
    Iterator((f(items.iterator), g(items.iterator)))
  })
}

def reduceLocally2[U, V](rdd: RDD[(U, V)])(f: (U, U) => U, g: (V, V) => V) = {
   rdd.collect.reduce((x, y) => (f(x._1, y._1), g(x._2, y._2)))
}

def evaluate[U, V, X, Z](pair: (U, V))(f: U => X, g: V => Z) = (f(pair._1), g(pair._2)) 

val rdd = sc.range(0L, 100L)

def f(iter: Iterator[Long]) = iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)
def g(iter: Iterator[Long]) = iter.filter(isOdd _).foldLeft(0L)(_ + _)


evaluate(reduceLocally2(mapPartitions2(rdd)(f, g))(_ merge _, _ + _))(_.mean, identity)

The biggest issue here is that we have to eagerly evaluate each partition to be able to apply individual pipelines. It means that overall memory requirements can be significantly higher compared to the same logic applied separately. Without caching* it is also useless in case of multistage jobs.

An alternative solution is to process data element-wise but treat each item as a tuple of seqs:

def map2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(f: T => V, g: U => X) = {
  rdd.map{ case (ts, us) => (ts.map(f), us.map(g)) }
}


def filter2[T, U](rdd: RDD[(Seq[T], Seq[U])])(
    f: T => Boolean, g: U => Boolean) = {
  rdd.map{ case (ts, us) => (ts.filter(f), us.filter(g)) }
}


def aggregate2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(zt: V, zu: X)
    (s1: (V, T) => V, s2: (X, U) => X, m1: (V, V) => V, m2: (X, X) => X) = {
  rdd.mapPartitions(iter => {
    var accT = zt
    var accU = zu
    iter.foreach { case (ts, us) => {
      accT = ts.foldLeft(accT)(s1)
      accU = us.foldLeft(accU)(s2)
    }}

    Iterator((accT, accU))
  }).reduce { case ((v1, x1), (v2, x2)) => ((m1(v1, v2), m2(x1, x2))) }
}

With API like this we can express initial pipelines as:

val rddSeq = rdd.map(x => (Seq(x), Seq(x)))


aggregate2(filter2(rddSeq)(isEven, isOdd))(StatCounter(), 0L)(
  _ merge _, _ + _, _ merge _, _ + _
)

This approach is slightly more powerful then the former one (you can easily implement some subset of byKey methods if needed) and memory requirements in typical pipelines should be comparable to the core API but it is also significantly more intrusive.


* You can check an answer provided by eje for multiplexing examples.

like image 134
zero323 Avatar answered Oct 14 '22 07:10

zero323