Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why join in spark in local mode is so slow?

I am using spark in local mode and a simple join is taking too long. I have fetched two dataframes: A (8 columns and 2.3 million rows) and B(8 columns and 1.2 million rows) and joining them using A.join(B,condition,'left') and called an action at last. It creates a single job with three stages, each for two dataframes extraction and one for joining. Surprisingly stage with extraction of dataframe A is taking around 8 minutes and that of dataframe B is taking 1 minute. And join happens within seconds. My important configuration settings are:

  1. spark.master local[*]
  2. spark.driver.cores 8
  3. spark.executor.memory 30g
  4. spark.driver.memory 30g
  5. spark.serializer org.apache.spark.serializer.KryoSerializer
  6. spark.sql.shuffle.partitions 16

The only executor is driver itself. While extracting dataframes, i have partitioned it in 32(also tried 16,64,50,100,200) parts. I have seen shuffle write memory to be 100 MB for Stage with dataframe A extraction. So to avoid shuffle i made 16 initial partitions for both dataframes and broadcasted dataframe B(smaller), but it is not helping. There is still shuffle write memory. I have used broadcast(B) syntax for this. Am I doing something wrong? Why shuffling is still there? Also when i see event timelines its showing only four cores are processing at any point of time. Although I have a 2core*4 processor machine.Why is that so?

like image 710
Bhanuday Birla Avatar asked Oct 30 '22 15:10

Bhanuday Birla


1 Answers

In short, "Join"<=>Shuffling, the big question here is how uniformly are your data distributed over partitions (see for example https://0x0fff.com/spark-architecture-shuffle/ , https://www.slideshare.net/SparkSummit/handling-data-skew-adaptively-in-spark-using-dynamic-repartitioning and just Google the problem). Few possibilities to improve efficiency:

  • think more about your data (A and B) and partition data wisely;
  • analyze, are your data skewed?;
  • go into UI and look at the tasks timing;
  • choose such keys for partitions that during "join" only few partitions from dataset A shuffle with few partitions of B;
like image 188
Danylo Zherebetskyy Avatar answered Nov 15 '22 10:11

Danylo Zherebetskyy