Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Spark optimize chained transformations?

It is often cleaner to express a complicated map operation as a series of chained map tasks in code rather than as one large operation. I know the Spark DAG Scheduler performs optimizations but will it also optimize chained operations in this way?

Here's a contrived example where a list of distinct dates is pulled out of a CSV field.:

csv.map(row => row.split(","))
   .map(row => row(6)) // extract the proper field
   .map(date_field => DateTime.parse(date_field).withTimeAtStartOfDay())
   .distinct()

Would this example be more efficient as one map operation followed by a distinct()?

like image 879
jhappoldt Avatar asked Jul 30 '14 22:07

jhappoldt


2 Answers

Guess I'll just turn my comment into an answer since no one else has decided to answer. Basically this is one of the main points of having a lazy DAG architecture. Since nothing will execute until the final DAG is seen making optimizations like combining operations that don't require a shuffle are relatively trivial (I'll see if I can find the actual code). Let's say you have a bunch of maps in a row, spark knows it can discard the results of the previous map unless you cache, caching prevents the RDD from having to be recalculated if you use it more than once. So consolidating to 1 map function will not be more than a micro optimization and will likely have no effect when you consider many MR style jobs are IO bound.

UPDATE: From looking through the spark user list it seems that a Stage can have multiple tasks, specifically tasks that can be chained together like maps can be put into one stage.

like image 170
aaronman Avatar answered Nov 10 '22 15:11

aaronman


Short Answer: yes, but only for linear dependency.

Long Answer: Comparing the query optimizer of Spark SQL/DataFrame, almost non-existence.

Spark core API doesn't rewrite a DAG's execution plan even its obviously beneficial. here is an example:

considering DAG:

A > B > D
  > C >

Where D is collected and A is not persisted (persisting is an expensive operation, plus if you don't know if D will be collected, you can't decide when to unpersist it). Ideally, an optimizer should convert this DAG to the linear & much more cheaper A > Tuple2(B, C) > D. So let's test it:

val acc = sc.accumulator(0)
val O = sc.parallelize(1 to 100)
val A = O.map{
  v =>
    acc += 1
    v * 2
}
val B = A.map(_*2)
val C = A.map(_*3)
val D = B.zip(C).map(v => v._1 + v._2)

D.collect()

assert(acc.value == 100)

The result?

200 did not equal 100

It is clear that the unoptimized DAG is executed.

In addition, such feature (or anything close, e.g. broadcast join/shuffle join cost-based optimizer) has never been proposed. Probably because most Spark developer prefers more direct control on execution, or such optimizations have very limited effect comparing to what SQL query optimizer can do.

like image 45
tribbloid Avatar answered Nov 10 '22 16:11

tribbloid