Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elaboration on why shuffle write data is way more then input data in apache spark

enter image description here

  1. Can anyone elaborate to me what exactly Input, Output, Shuffle Read, and Shuffle Write specify in spark UI?
  2. Also, Can someone explain how is input in this job is 25~30% of shuffle write? As per my understanding, shuffle write is sum of temporary data that cannot be hold in memory and data that needs to sent to other executors during aggregation or reducing.

Code Below :

hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'")
    .rdd
    .map{case (row:Row)
            =>((row.getString(0), row.getString(12)),
                (row.getTimestamp(11), row.getTimestamp(11),
                    row))}
    .filter{case((client, hash),(d1,d2,obj)) => (d1 !=null && d2 !=null)}
   .reduceByKey{
       case(x, y)=>
            if(x._1.before(y._1)){
                if(x._2.after(y._2))
                    (x)
                else
                    (x._1, y._2, y._3)
            }else{
                if(x._2.after(y._2))
                    (y._1, x._2, x._3)
                else
                    (y)
            }
   }.count()

Where ReadDailyFileDataObject is a case Class which holds the row fields as a container. Container is required as there are 30 columns, which exceeds tuple limit of 22.

Updated Code, removed case class, as I see same issue, when i use Row itself instead of case Class.

Now currently i see

Task : 10/7772

Input : 2.1 GB

Shuffle Write : 14.6 GB

If it helps, i am trying to process table stored as parquet file, containing 21 billion rows.

Below are the parameters i am using,

"spark.yarn.am.memory" -> "10G"
"spark.yarn.am.cores"  -> "5"
"spark.driver.cores"   -> "5"
"spark.executor.cores" -> "10"
"spark.dynamicAllocation.enabled" -> "true"
"spark.yarn.containerLauncherMaxThreads" -> "120"
"spark.executor.memory" -> "30g"
"spark.driver.memory" -> "10g"
"spark.driver.maxResultSize" -> "9g"
"spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"
"spark.kryoserializer.buffer" -> "10m"
"spark.kryoserializer.buffer.max" -> "2001m"
"spark.akka.frameSize" -> "2020"

SparkContext is registered as

new SparkContext("yarn-client", SPARK_SCALA_APP_NAME, sparkConf)

On Yarn, i see

Allocated CPU VCores : 95

Allocated Memory : 309 GB

Running Containers : 10

like image 919
Abhishek Anand Avatar asked Mar 29 '16 10:03

Abhishek Anand


People also ask

Why do we shuffle data in Spark?

The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.

What is shuffle read and shuffle write in Spark?

Input: Bytes read from storage in this stage. Output: Bytes written in storage in this stage. Shuffle read: Total shuffle bytes and records read, includes both data read locally and data read from remote executors. Shuffle write: Bytes and records written to disk in order to be read by a shuffle in a future stage.

Where does Spark write shuffle data?

Spark gathers the required data from each partition and combines it into a new partition. During a shuffle, data is written to disk and transferred across the network. As a result, the shuffle operation is bound to local disk capacity.


2 Answers

The shown tips when you hover your mouse over Input Output Shuffle Read Shuffle Write explain themselves quite well:

INPUT: Bytes and records read from Hadoop or from Spark storage.

OUTPUT: Bytes and records written to Hadoop.

SHUFFLE_WRITE: Bytes and records written to disk in order to be read by a shuffle in a future stage.

Shuffle_READ: Total shuffle bytes and records read (includes both data read locally and data read from remote executors).

In your situation, 150.1GB account for all the 1409 finished task's input size (i.e, the total size read from HDFS so far), and 874GB account for all the 1409 finished task's write on node's local disk.

You can refer to What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming? to understand the overall shuffle functionality well.

like image 155
yjshen Avatar answered Oct 17 '22 21:10

yjshen


It's actually hard to provide an answer without the code, but it is possible that you are going through your data multiple times, so the total volume you are processing is actually "X" times your original data.

Can you post the code you are running?

EDIT

Looking at the code, I have had this kind of issue before, and it was due to the serialization of the Row, so this might be your case as well.

What is "ReadDailyFileDataObject"? Is it a class, a case class?

I would first try running your code like this:

hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'")
    .rdd
    .map{case (row:Row)
            =>((row.get(0).asInstanceOf[String], row.get(12).asInstanceOf[String]),
                (row.get(11).asInstanceOf[Timestamp], row.get(11).asInstanceOf[Timestamp]))}
    .filter{case((client, hash),(d1,d2)) => (d1 !=null && d2 !=null)}
   .reduceByKey{
       case(x, y)=>
            if(x._1.before(y._1)){
                if(x._2.after(y._2))
                    (x)
                else
                    (x._1, y._2)
            }else{
                if(x._2.after(y._2))
                    (y._1, x._2)
                else
                    (y)
            }
   }.count()

If that gets rids of your shuffling problem, then you can refactor it a little: - Make it a case class, if it isn't already. - Create it like "ReadDailyFileDataObject(row.getInt(0), row.getString(1), etc..)"

Hope this counts as an answer, and helps you find your bottleneck.

like image 30
alghimo Avatar answered Oct 17 '22 21:10

alghimo