E.g. if I run over the same RDD of numbers where one flow filters for the even numbers and averages them and the other filters for the odd and sums them. If I write this as two pipelines over the same RDD this will create two executions, that will scan the RDD twice, which can be expensive in terms of IO.
How can this IO be reduced to only read the data once without rewriting the logic to be in one pipeline? A framework that takes two pipelines and merges them to one is OK of course, just as long as developers continue to work on each pipeline independently (in the real case, these pipelines are loaded from separate modules)
The point is not to use cache() to achieve this
Since your question is rather vague let's think about general strategies that can be used to approach this problem.
A standard solution here would be caching, but since you explicitly want to avoid it, I assume there some additional limitations here. It suggests that some similar solutions, like
are not acceptable either. It means you have to find some to manipulate pipeline itself.
Although multiple transformations can be squashed together every transformation creates a new RDD. This, combined with your statement about caching, sets relatively strong constraints on possible solutions.
Let's start with the simplest possible case where all pipelines can be expressed a single stage jobs. This restricts our choices to map only jobs and simple map-reduce jobs (like the one described in your question). Pipelines like this can be easily expressed as a sequence of operations on local iterators. So the following
import org.apache.spark.util.StatCounter
def isEven(x: Long) = x % 2 == 0
def isOdd(x: Long) = !isEven(x)
def p1(rdd: RDD[Long]) = {
rdd
.filter(isEven _)
.aggregate(StatCounter())(_ merge _, _ merge _)
.mean
}
def p2(rdd: RDD[Long]) = {
rdd
.filter(isOdd _)
.reduce(_ + _)
}
could be expressed as:
def p1(rdd: RDD[Long]) = {
rdd
.mapPartitions(iter =>
Iterator(iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)))
.collect
.reduce(_ merge _)
.mean
}
def p2(rdd: RDD[Long]) = {
rdd
.mapPartitions(iter =>
Iterator(iter.filter(isOdd _).foldLeft(0L)(_ + _)))
.collect
.reduce(_ + _)
// identity _
}
At this point we can rewrite separate jobs as follows:
def mapPartitions2[T, U, V](rdd: RDD[T])(f: Iterator[T] => U, g: Iterator[T] => V) = {
rdd.mapPartitions(iter => {
val items = iter.toList
Iterator((f(items.iterator), g(items.iterator)))
})
}
def reduceLocally2[U, V](rdd: RDD[(U, V)])(f: (U, U) => U, g: (V, V) => V) = {
rdd.collect.reduce((x, y) => (f(x._1, y._1), g(x._2, y._2)))
}
def evaluate[U, V, X, Z](pair: (U, V))(f: U => X, g: V => Z) = (f(pair._1), g(pair._2))
val rdd = sc.range(0L, 100L)
def f(iter: Iterator[Long]) = iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)
def g(iter: Iterator[Long]) = iter.filter(isOdd _).foldLeft(0L)(_ + _)
evaluate(reduceLocally2(mapPartitions2(rdd)(f, g))(_ merge _, _ + _))(_.mean, identity)
The biggest issue here is that we have to eagerly evaluate each partition to be able to apply individual pipelines. It means that overall memory requirements can be significantly higher compared to the same logic applied separately. Without caching* it is also useless in case of multistage jobs.
An alternative solution is to process data element-wise but treat each item as a tuple of seqs:
def map2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(f: T => V, g: U => X) = {
rdd.map{ case (ts, us) => (ts.map(f), us.map(g)) }
}
def filter2[T, U](rdd: RDD[(Seq[T], Seq[U])])(
f: T => Boolean, g: U => Boolean) = {
rdd.map{ case (ts, us) => (ts.filter(f), us.filter(g)) }
}
def aggregate2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(zt: V, zu: X)
(s1: (V, T) => V, s2: (X, U) => X, m1: (V, V) => V, m2: (X, X) => X) = {
rdd.mapPartitions(iter => {
var accT = zt
var accU = zu
iter.foreach { case (ts, us) => {
accT = ts.foldLeft(accT)(s1)
accU = us.foldLeft(accU)(s2)
}}
Iterator((accT, accU))
}).reduce { case ((v1, x1), (v2, x2)) => ((m1(v1, v2), m2(x1, x2))) }
}
With API like this we can express initial pipelines as:
val rddSeq = rdd.map(x => (Seq(x), Seq(x)))
aggregate2(filter2(rddSeq)(isEven, isOdd))(StatCounter(), 0L)(
_ merge _, _ + _, _ merge _, _ + _
)
This approach is slightly more powerful then the former one (you can easily implement some subset of byKey
methods if needed) and memory requirements in typical pipelines should be comparable to the core API but it is also significantly more intrusive.
* You can check an answer provided by eje for multiplexing examples.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With