I want to time my Spark program execution speed but due to laziness it's quite difficult. Let's take into account this (meaningless) code here:
var graph = GraphLoader.edgeListFile(context, args(0))
val graph_degs = graph.outerJoinVertices(graph.degrees).triplets.cache
/* I'd need to start the timer here */
val t1 = System.currentTimeMillis
val edges = graph_degs.flatMap(trip => { /* do something*/ })
.union(graph_degs)
val count = edges.count
val t2 = System.currentTimeMillis
/* I'd need to stop the timer here */
println("It took " + t2-t1 + " to count " + count)
The thing is, transformations are lazily so nothing gets evaluated before val count = edges.count
line. But according to my point of view t1
gets a value despite the code above hasn't a value... the code above t1
gets evaluated after the timer started despite the position in the code. That's a problem...
In Spark Web UI I can't find anything interesting about it since I need the time spent after that specific line of code. Do you think is there a easy solution to see when a group of transformation gets evaluated for real?
Since consecutive transformations (within the same task - meaning, they are not separated by shuffles and performed as part of the same action) are performed as a single "step", Spark does not measure them individually. And from Driver code - you can't either.
What you can do is measure the duration of applying your function to each record, and use an Accumulator to sum it all up, e.g.:
// create accumulator
val durationAccumulator = sc.longAccumulator("flatMapDuration")
// "wrap" your "doSomething" operation with time measurement, and add to accumulator
val edges = rdd.flatMap(trip => {
val t1 = System.currentTimeMillis
val result = doSomething(trip)
val t2 = System.currentTimeMillis
durationAccumulator.add(t2 - t1)
result
})
// perform the action that would trigger evaluation
val count = edges.count
// now you can read the accumulated value
println("It took " + durationAccumulator.value + " to flatMap " + count)
You can repeat this for any individual transformation.
Disclaimers:
Style Note:
You can make this code more reusable by creating a measure
function that "wraps" around any function and updates a given accumulator:
// write this once:
def measure[T, R](action: T => R, acc: LongAccumulator): T => R = input => {
val t1 = System.currentTimeMillis
val result = action(input)
val t2 = System.currentTimeMillis
acc.add(t2 - t1)
result
}
// use it with any transformation:
rdd.flatMap(measure(doSomething, durationAccumulator))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With