Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: shuffle operation leading to long GC pause

I'm running Spark 2 and am trying to shuffle around 5 terabytes of json. I'm running into very long garbage collection pauses during shuffling of a Dataset:

val operations = spark.read.json(inPath).as[MyClass]
operations.repartition(partitions, operations("id")).write.parquet("s3a://foo")

Are there any obvious configuration tweaks to deal with this issue? My configuration is as follows:

spark.driver.maxResultSize 6G
spark.driver.memory 10G
spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G -XX:+HeapDumpOnOutOfMemoryError
spark.executor.memory   32G
spark.hadoop.fs.s3a.buffer.dir  /raid0/spark
spark.hadoop.fs.s3n.buffer.dir  /raid0/spark
spark.hadoop.fs.s3n.multipart.uploads.enabled   true
spark.hadoop.parquet.block.size 2147483648
spark.hadoop.parquet.enable.summary-metadata    false
spark.local.dir /raid0/spark
spark.memory.fraction 0.8
spark.mesos.coarse  true
spark.mesos.constraints  priority:1
spark.mesos.executor.memoryOverhead 16000
spark.network.timeout   600
spark.rpc.message.maxSize    1000
spark.speculation   false
spark.sql.parquet.mergeSchema   false
spark.sql.planner.externalSort  true
spark.submit.deployMode client
spark.task.cpus 1
like image 287
Luke Avatar asked Aug 16 '16 18:08

Luke


People also ask

What is the spark shuffle?

The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation. Stages, tasks and shuffle writes and reads are concrete concepts that can be monitored from the Spark shell.

What is parallelising in spark shuffle?

Parallelising effectively of the spark shuffle operation gives performance output as good for spark jobs. Spark data frames are the partitions of Shuffle operations. The original data frame partitions differ with the number of data frame partitions.

What is number reduction in spark shuffle?

Data is returned to disk and is transferred all across the network during a shuffle. The shuffle operation number reduction is to be done or consequently reduce the amount of data being shuffled. By default, Spark shuffle operation uses partitioning of hash to determine which key-value pair shall be sent to which machine.

How to reduce long GC pause time?

High garbage collection rate will increase the GC pause time as well. Thus, optimizing the application to create less number of objects is THE EFFECTIVE strategy to reduce long GC pauses. This might be a time-consuming exercise, but it is 100% worth doing.


1 Answers

Adding the following flags got rid of the GC pauses.

spark.executor.extraJavaOptions -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12

I think it does take a fair amount of tweaking though. This databricks post was very very helpful.

like image 105
Luke Avatar answered Nov 15 '22 07:11

Luke