I need to run a spark program which has huge amount of data. I am trying to optimize the spark program and working through spark UI and trying to reduce the Shuffle part.
There are couple of components mentioned, shuffle read and shuffle write. I can understand the difference based their terminology, but I would like to understand the exact meaning of them and which one of spark's shuffle read/write reduces the performance?
I have searched over the internet, but could not find solid in depth details about them, so wanted to see if any one can explain them here.
Input: Bytes read from storage in this stage. Output: Bytes written in storage in this stage. Shuffle read: Total shuffle bytes and records read, includes both data read locally and data read from remote executors. Shuffle write: Bytes and records written to disk in order to be read by a shuffle in a future stage.
The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.
Spark gathers the required data from each partition and combines it into a new partition. During a shuffle, data is written to disk and transferred across the network. As a result, the shuffle operation is bound to local disk capacity.
From the UI tooltip
Shuffle Read
Total shuffle bytes and records read (includes both data read locally and data read from remote executors
Shuffle Write
Bytes and records written to disk in order to be read by a shuffle in a future stage
I've recently begun working with Spark. I have been looking for answers to the same sort of questions.
When the data from one stage is shuffled to a next stage through the network, the executor(s) that process the next stage pull the data from the first stage's process through TCP. I noticed the shuffle "write" and "read" metrics for each stage are displayed in the Spark UI for a particular job. A stage also potentially had an "input" size (eg. input from HDFS or hive table scan).
I noticed that the shuffle write size from one stage that fed into another stage did not match that stages shuffle read size. If I remember correctly, there are reducer-type operations that can be performed on the shuffle data prior to it being transferred to the next stage/executor as an optimization. Maybe this contributes to the difference in size and therefore the relevance of reporting both values.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With