Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to time Spark program execution speed

I want to time my Spark program execution speed but due to laziness it's quite difficult. Let's take into account this (meaningless) code here:

var graph = GraphLoader.edgeListFile(context, args(0))
val graph_degs = graph.outerJoinVertices(graph.degrees).triplets.cache

/* I'd need to start the timer here */
val t1 = System.currentTimeMillis  
val edges = graph_degs.flatMap(trip =>  { /* do something*/ })
                      .union(graph_degs)

val count = edges.count
val t2 = System.currentTimeMillis 
/* I'd need to stop the timer here */

println("It took " + t2-t1 + " to count " + count)

The thing is, transformations are lazily so nothing gets evaluated before val count = edges.count line. But according to my point of view t1 gets a value despite the code above hasn't a value... the code above t1 gets evaluated after the timer started despite the position in the code. That's a problem...

In Spark Web UI I can't find anything interesting about it since I need the time spent after that specific line of code. Do you think is there a easy solution to see when a group of transformation gets evaluated for real?

like image 862
Matt Avatar asked Apr 20 '17 16:04

Matt


1 Answers

Since consecutive transformations (within the same task - meaning, they are not separated by shuffles and performed as part of the same action) are performed as a single "step", Spark does not measure them individually. And from Driver code - you can't either.

What you can do is measure the duration of applying your function to each record, and use an Accumulator to sum it all up, e.g.:

// create accumulator
val durationAccumulator = sc.longAccumulator("flatMapDuration")

// "wrap" your "doSomething" operation with time measurement, and add to accumulator
val edges = rdd.flatMap(trip => {
  val t1 = System.currentTimeMillis
  val result = doSomething(trip)
  val t2 = System.currentTimeMillis
  durationAccumulator.add(t2 - t1)
  result
})

// perform the action that would trigger evaluation
val count = edges.count

// now you can read the accumulated value
println("It took " + durationAccumulator.value + " to flatMap " + count)

You can repeat this for any individual transformation.

Disclaimers:

  • Of course, this will not include the time Spark spent shuffling things around and doing the actual counting - for that, indeed, the Spark UI is your best resource.
  • Note that accumulators are sensitive to things like retries - a retried task will update the accumulator twice.

Style Note: You can make this code more reusable by creating a measure function that "wraps" around any function and updates a given accumulator:

// write this once:
def measure[T, R](action: T => R, acc: LongAccumulator): T => R = input => { 
  val t1 = System.currentTimeMillis
  val result = action(input)
  val t2 = System.currentTimeMillis
  acc.add(t2 - t1)
  result
}

// use it with any transformation:
rdd.flatMap(measure(doSomething, durationAccumulator))
like image 177
Tzach Zohar Avatar answered Sep 21 '22 19:09

Tzach Zohar