Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to join big dataframes in Spark SQL? (best practices, stability, performance)

I'm getting the same error than Missing an output location for shuffle when joining big dataframes in Spark SQL. The recommendation there is to set MEMORY_AND_DISK and/or spark.shuffle.memoryFraction 0. However, spark.shuffle.memoryFraction is deprecated in Spark >= 1.6.0 and setting MEMORY_AND_DISK shouldn't help if I'm not caching any RDD or Dataframe, right? Also I'm getting lots of other WARN logs and task retries that lead me to think that the job is not stable.

Therefore, my question is:

  • What are best practices to join huge dataframes in Spark SQL >= 1.6.0?

More specific questions are:

  • How to tune number of executors and spark.sql.shuffle.partitions to achieve better stability/performance?
  • How to find the right balance between level of parallelism (num of executors/cores) and number of partitions? I've found that increasing the num of executors is not always the solution as it may generate I/O reading time out exceptions because of network traffic.
  • Is there any other relevant parameter to be tuned for this purpose?
  • My understanding is that joining data stored as ORC or Parquet offers better performance than text or Avro for join operations. Is there a significant difference between Parquet and ORC?
  • Is there an advantage of SQLContext vs HiveContext regarding stability/performance for join operations?
  • Is there a difference regarding performance/stability when the dataframes involved in the join are previously registerTempTable() or saveAsTable()?

So far I'm using this is answer and this chapter as a starting point. And there are a few more stackoverflow pages related to this subject. Yet I haven't found a comprehensive answer to this popular issue.

Thanks in advance.

like image 831
leo9r Avatar asked Jun 23 '16 09:06

leo9r


People also ask

How can I improve my Spark join performance?

Try to use Broadcast joins wherever possible and filter out the irrelevant rows to the join key to avoid unnecessary data shuffling. And for cases if you are confident enough that Shuffle Hash join is better than Sort Merge join, disable Sort Merge join for those scenarios.

How do I join two large DataFrames in Spark?

Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied.


1 Answers

That are a lot of questions. Allow me to answer these one by one:

Your number of executors is most of the time variable in a production environment. This depends on the available resources. The number of partitions is important when you are performing shuffles. Assuming that your data is now skewed, you can lower the load per task by increasing the number of partitions. A task should ideally take a couple of minus. If the task takes too long, it is possible that your container gets pre-empted and the work is lost. If the task takes only a few milliseconds, the overhead of starting the task gets dominant.

The level of parallelism and tuning your executor sizes, I would like to refer to the excellent guide by Cloudera: https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

ORC and Parquet only encode the data at rest. When doing the actual join, the data is in the in-memory format of Spark. Parquet is getting more popular since Netflix and Facebook adopted it and put a lot of effort in it. Parquet allows you to store the data more efficient and has some optimisations (predicate pushdown) that Spark uses.

You should use the SQLContext instead of the HiveContext, since the HiveContext is deprecated. The SQLContext is more general and doesn't only work with Hive.

When performing the registerTempTable, the data is stored within the SparkSession. This doesn't affect the execution of the join. What it stores is only the execution plan which gets invoked when an action is performed (for example saveAsTable). When performining a saveAsTable the data gets stored on the distributed file system.

Hope this helps. I would also suggest watching our talk at the Spark Summit about doing joins: https://www.youtube.com/watch?v=6zg7NTw-kTQ. This might provide you some insights.

Cheers, Fokko

like image 93
Fokko Driesprong Avatar answered Oct 11 '22 12:10

Fokko Driesprong