In below screenshot of Spark admin running on port 8080 :
The "Shuffle Read" & "Shuffle Write" parameters are always empty for this code :
import org.apache.spark.SparkContext; object first { println("Welcome to the Scala worksheet") val conf = new org.apache.spark.SparkConf() .setMaster("local") .setAppName("distances") .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4") .set("spark.executor.memory", "2g") val sc = new SparkContext(conf) def euclDistance(userA: User, userB: User) = { val subElements = (userA.features zip userB.features) map { m => (m._1 - m._2) * (m._1 - m._2) } val summed = subElements.sum val sqRoot = Math.sqrt(summed) println("value is" + sqRoot) ((userA.name, userB.name), sqRoot) } case class User(name: String, features: Vector[Double]) def createUser(data: String) = { val id = data.split(",")(0) val splitLine = data.split(",") val distanceVector = (splitLine.toList match { case h :: t => t }).map(m => m.toDouble).toVector User(id, distanceVector) } val dataFile = sc.textFile("c:\\data\\example.txt") val users = dataFile.map(m => createUser(m)) val cart = users.cartesian(users) // val distances = cart.map(m => euclDistance(m._1, m._2)) //> distances : org.apache.spark.rdd.RDD[((String, String), Double)] = MappedR //| DD[4] at map at first.scala:46 val d = distances.collect // d.foreach(println) //> ((a,a),0.0) //| ((a,b),0.0) //| ((a,c),1.0) //| ((a,),0.0) //| ((b,a),0.0) //| ((b,b),0.0) //| ((b,c),1.0) //| ((b,),0.0) //| ((c,a),1.0) //| ((c,b),1.0) //| ((c,c),0.0) //| ((c,),0.0) //| ((,a),0.0) //| ((,b),0.0) //| ((,c),0.0) //| ((,),0.0) }
Why are "Shuffle Read" & "Shuffle Write" fields empty ? Can above code be tweaked in order to populate these fields so as to understand how
Shuffle Read Size / Records. Total shuffle bytes read, includes both data read locally and data read from remote executors. Shuffle Read Blocked Time is the time that tasks spent blocked waiting for shuffle data to be read from remote machines. Shuffle Remote Reads is the total shuffle bytes read from remote executors.
If you have to do an operation before the join that requires a shuffle, such as aggregateByKey or reduceByKey , you can prevent the shuffle by adding a hash partitioner with the same number of partitions as an explicit argument to the first operation before the join.
The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.
Shuffling means the reallocation of data between multiple Spark stages. "Shuffle Write" is the sum of all written serialized data on all executors before transmitting (normally at the end of a stage) and "Shuffle Read" means the sum of read serialized data on all executors at the beginning of a stage.
Your programm has only one stage, triggered by the "collect" operation. No shuffling is required, because you have only a bunch of consecutive map operations which are pipelined in one Stage.
Try to take a look at these slides: http://de.slideshare.net/colorant/spark-shuffle-introduction
It could also help to read chapture 5 from the original paper: http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf
I believe you have to run your application in cluster/distributed mode to see any Shuffle read or write values. Typically "shuffle" are triggered by a subset of Spark actions (e.g., groupBy, join, etc)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With