Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

val vs def performance on Spark Dataframe

The following code and hence a question on performance - imagine of course at scale:

import org.apache.spark.sql.types.StructType

val df = sc.parallelize(Seq(
   ("r1", 1, 1),
   ("r2", 6, 4),
   ("r3", 4, 1),
   ("r4", 1, 2)
   )).toDF("ID", "a", "b")

val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

// or

def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

df.withColumn("ones", ones).explain

Here under two Physical Plans for when using def and val - which are the same:

 == Physical Plan == **def**
 *(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#762 = 1) THEN 1 ELSE 0 END) AS ones#770]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#760, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#761, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#762]
   +- Scan[obj#759]


 == Physical Plan == **val**
 *(1) Project [_1#780 AS ID#784, _2#781 AS a#785, _3#782 AS b#786, (CASE WHEN (_2#781 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#782 = 1) THEN 1 ELSE 0 END) AS ones#790]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#780, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#781, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#782]
    +- Scan[obj#779] 

So, there is the discussion on:

val vs def performance.

Then:

  • I see no difference in the .explains. OK.

  • From elsewhere: val evaluates when defined, def - when called.

  • I am assuming that it makes no difference whether a val or def is used here as it essentially within a loop and there is a reduce. Is this correct?
  • Will df.schema.map(c => c.name).drop(1) be executed per dataframe row? There is of course no need. Does Catalyst optimize this?
  • If the above is true in that the statement is executed every time for the columns to process, how can we make that piece of code occur just once? Should we make a val of val ones = df.schema.map(c => c.name).drop(1)
  • val, def is more than Scala, also Spark component.

To the -1er I ask thus this as the following is very clear, but the val ones has more to it than code below and the below is not iterated:

var x = 2 // using var as I need to change it to 3 later
val sq = x*x // evaluates right now
x = 3 // no effect! sq is already evaluated
println(sq)
like image 729
thebluephantom Avatar asked Mar 20 '26 19:03

thebluephantom


1 Answers

There are two core concepts at hand here, Spark DAG creation and evaluation, and Scala's val vs def definitions, these are orthogonal

I see no difference in the .explains

You see no difference because from Spark's perspective, the query is the same. It doesn't matter to the analyser if you store the graph in a val or create it each time with a def.

From elsewhere: val evaluates when defined, def - when called.

This is Scala semantics. A val is an immutable reference which gets evaluated once at the declaration site. A def stands for method definition, and if you allocate a new DataFrame inside it, it will create one each time you call it. For example:

def ones = 
  df
   .schema
   .map(c => c.name)
   .drop(1)
   .map(x => when(col(x) === 1, 1).otherwise(0))
   .reduce(_ + _)

val firstcall = ones
val secondCall = ones

The code above will build two separate DAGs over the DF.

I am assuming that it makes no difference whether a val or def is used here as it essentially within a loop and there is a reduce. Is this correct?

I'm not sure which loop you're talking about, but see my answer above for the distinction between the two.

Will df.schema.map(c => c.name).drop(1) be executed per dataframe row? There is of course no need. Does Catalyst optimize this?

No, drop(1) will happen for the entire data frame, which will essentially make it drop the first row only.

If the above is true in that the statement is executed every time for the columns to process, how can we make that piece of code occur just once? Should we make a val of val ones = df.schema.map(c => c.name).drop(1)

It does occur only once per data frame (which in your example we have exactly one of).

like image 133
Yuval Itzchakov Avatar answered Mar 22 '26 15:03

Yuval Itzchakov