Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Partitionby doesn't scale as expected

INPUT:

The input data set contains 10 million transactions in multiple files stored as parquet. The size of the entire data set including all files ranges from 6 to 8GB.

PROBLEM STATEMENT:

Partition the transactions based on customer id's which would create one folder per customer id and each folder containing all the transactions done by that particular customer.

HDFS has a hard limit of 6.4 million on the number of sub directories within a root directory that can be created so using the last two digits of the customer id ranging from 00,01,02...to 99 to create top level directories and each top level directory would contain all the customer id's ending with that specific two digits.

Sample output directory structure:

00/cust_id=100900/part1.csv
00/cust_id=100800/part33.csv

01/cust_id=100801/part1.csv
03/cust_id=100803/part1.csv

CODE:

// Reading input file and storing in cache
val parquetReader = sparksession.read
  .parquet("/inputs")
  .persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory

// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
  var idEndPattern = customerIdEndingPattern + ""
  if (customerIdEndingPattern < 10) {
    idEndPattern = "0" + customerIdEndingPattern
  }

  parquetReader
    .filter(col("customer_id").endsWith(idEndPattern))
    .repartition(945, col("customer_id"))
    .write
    .partitionBy("customer_id")
    .option("header", "true")
    .mode("append")
    .csv("/" + idEndPattern)
  customerIdEndingPattern = customerIdEndingPattern + 1
}

Spark Configuration: Amazon EMR 5.29.0 (Spark 2.4.4 & Hadoop 2.8.5)

1 master and 10 slaves and each of them has 96 vCores and 768GB RAM(Amazon AWS R5.24xlarge instance). Hard disks are EBS with bust of 3000 IOPS for 30 mins.

            'spark.hadoop.dfs.replication': '3',
            'spark.driver.cores':'5',
            'spark.driver.memory':'32g',
            'spark.executor.instances': '189',
            'spark.executor.memory': '32g',
            'spark.executor.cores': '5',
            'spark.executor.memoryOverhead':'8192',
            'spark.driver.memoryOverhead':'8192',
            'spark.default.parallelism':'945',
            'spark.sql.shuffle.partitions' :'945',
            'spark.serializer':'org.apache.spark.serializer.KryoSerializer',
            'spark.dynamicAllocation.enabled': 'false',
            'spark.memory.fraction':'0.8',
            'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version':'2',
            'spark.memory.storageFraction':'0.2',
            'spark.task.maxFailures': '6',
            'spark.driver.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
            'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"

SCALING ISSUES:

  1. Experimented from 10 to all the way upto 40 slaves(adjusting the spark configs accordingly) but still the same results the job takes more than 2hrs to complete(as shown in the first pic each job takes more than a minute and the while loop runs 99 times). Also the reads from remote executors are almost non existent(which is good) most are process local.

  2. Partition seems to work fine(refer second pic) got 5 RDD blocks per instance and 5 tasks running at all times(each instance has 5 cores and 19 instances per slave node). GC is optimized too.

  3. Each partitionby task as written in the while loop takes a minute or more to complete.

METRICS:


Sample duration of a few jobs we have 99 jobs in total
Duration for each of the jobs(totally 99)


Partition seems okay
Partition seems okay

Summary from 1 job basically one partitionby execution
Summary of 1 job

Summary of a few instances after full job completion hence RDD blocks is zero and the first row is driver. enter image description here



So the question is how to optimize it more and why it's not scaling up? Is there a better way to go about it? Have I reached the max performance already? Assuming I have access to more resources in terms of hardware is there anything I could do better? Any suggestions are welcome.

like image 575
user1613360 Avatar asked Feb 11 '20 09:02

user1613360


People also ask

What is a good number of partitions in Spark?

The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.

How to reduce number of partitions in Spark?

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

Does partitionBy cause a shuffle?

Shuffle can of course still happen, but more because of the data loading and processing logic like calling repartitionByRange method, and not partitionBy.

How do I repartition a DataFrame in spark?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.


1 Answers

Touching every record 100 times is very inefficient, even if data can be cached in memory and not be evicted downstream. Not to mention persisting alone is expensive

Instead you could add a virtual column

import org.apache.spark.sql.functions.substring

val df = sparksession.read
  .parquet("/inputs")
  .withColumn("partition_id", substring($"customer_id", -2, 2))

and use it later for partitioning

df
  .write
  .partitionBy("partition_id", "customer_id")
  .option("header", "true")
  .mode("append")
  .csv("/")

To avoid to many small files you can repartition first using longer suffix

val nParts: Int = ???
val suffixLength: Int = ???  // >= suffix length used for write partitions

df
  .repartitionByRange(
    nParts,
    substring($"customer_id", -suffixLength, suffixLength)
  .write
  .partitionBy("partition_id", "customer_id")
  .option("header", "true")
  .mode("append")
  .csv("/")

Such changes will allow you to process all data in a single pass without any explicit caching.

like image 123
user10938362 Avatar answered Sep 28 '22 11:09

user10938362