Code Below :
hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'") .rdd .map{case (row:Row) =>((row.getString(0), row.getString(12)), (row.getTimestamp(11), row.getTimestamp(11), row))} .filter{case((client, hash),(d1,d2,obj)) => (d1 !=null && d2 !=null)} .reduceByKey{ case(x, y)=> if(x._1.before(y._1)){ if(x._2.after(y._2)) (x) else (x._1, y._2, y._3) }else{ if(x._2.after(y._2)) (y._1, x._2, x._3) else (y) } }.count()
Where ReadDailyFileDataObject is a case Class which holds the row fields as a container. Container is required as there are 30 columns, which exceeds tuple limit of 22.
Updated Code, removed case class, as I see same issue, when i use Row itself instead of case Class.
Now currently i see
Task : 10/7772
Input : 2.1 GB
Shuffle Write : 14.6 GB
If it helps, i am trying to process table stored as parquet file, containing 21 billion rows.
Below are the parameters i am using,
"spark.yarn.am.memory" -> "10G"
"spark.yarn.am.cores" -> "5"
"spark.driver.cores" -> "5"
"spark.executor.cores" -> "10"
"spark.dynamicAllocation.enabled" -> "true"
"spark.yarn.containerLauncherMaxThreads" -> "120"
"spark.executor.memory" -> "30g"
"spark.driver.memory" -> "10g"
"spark.driver.maxResultSize" -> "9g"
"spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"
"spark.kryoserializer.buffer" -> "10m"
"spark.kryoserializer.buffer.max" -> "2001m"
"spark.akka.frameSize" -> "2020"
SparkContext is registered as
new SparkContext("yarn-client", SPARK_SCALA_APP_NAME, sparkConf)
On Yarn, i see
Allocated CPU VCores : 95
Allocated Memory : 309 GB
Running Containers : 10
The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.
Input: Bytes read from storage in this stage. Output: Bytes written in storage in this stage. Shuffle read: Total shuffle bytes and records read, includes both data read locally and data read from remote executors. Shuffle write: Bytes and records written to disk in order to be read by a shuffle in a future stage.
Spark gathers the required data from each partition and combines it into a new partition. During a shuffle, data is written to disk and transferred across the network. As a result, the shuffle operation is bound to local disk capacity.
The shown tips when you hover your mouse over Input
Output
Shuffle Read
Shuffle Write
explain themselves quite well:
INPUT: Bytes and records read from Hadoop or from Spark storage.
OUTPUT: Bytes and records written to Hadoop.
SHUFFLE_WRITE: Bytes and records written to disk in order to be read by a shuffle in a future stage.
Shuffle_READ: Total shuffle bytes and records read (includes both data read locally and data read from remote executors).
In your situation, 150.1GB account for all the 1409 finished task's input size (i.e, the total size read from HDFS so far), and 874GB account for all the 1409 finished task's write on node's local disk.
You can refer to What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming? to understand the overall shuffle functionality well.
It's actually hard to provide an answer without the code, but it is possible that you are going through your data multiple times, so the total volume you are processing is actually "X" times your original data.
Can you post the code you are running?
EDIT
Looking at the code, I have had this kind of issue before, and it was due to the serialization of the Row, so this might be your case as well.
What is "ReadDailyFileDataObject"? Is it a class, a case class?
I would first try running your code like this:
hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'")
.rdd
.map{case (row:Row)
=>((row.get(0).asInstanceOf[String], row.get(12).asInstanceOf[String]),
(row.get(11).asInstanceOf[Timestamp], row.get(11).asInstanceOf[Timestamp]))}
.filter{case((client, hash),(d1,d2)) => (d1 !=null && d2 !=null)}
.reduceByKey{
case(x, y)=>
if(x._1.before(y._1)){
if(x._2.after(y._2))
(x)
else
(x._1, y._2)
}else{
if(x._2.after(y._2))
(y._1, x._2)
else
(y)
}
}.count()
If that gets rids of your shuffling problem, then you can refactor it a little: - Make it a case class, if it isn't already. - Create it like "ReadDailyFileDataObject(row.getInt(0), row.getString(1), etc..)"
Hope this counts as an answer, and helps you find your bottleneck.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With