Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is an optimized way of joining large tables in Spark SQL

I have a need of joining tables using Spark SQL or Dataframe API. Need to know what would be optimized way of achieving it.

Scenario is:

  1. All data is present in Hive in ORC format (Base Dataframe and Reference files).
  2. I need to join one Base file (Dataframe) read from Hive with 11-13 other reference file to create a big in-memory structure (400 columns) (around 1 TB in size)

What can be best approach to achieve this? Please share your experience if some one has encounter similar problem.

like image 996
S. K Avatar asked Jun 15 '16 18:06

S. K


People also ask

How will you join two large dataset if they are not fitted in memory in Spark?

Sticking to use cases mentioned above, Spark will perform (or be forced by us to perform) joins in two different ways: either using Sort Merge Joins if we are joining two big tables, or Broadcast Joins if at least one of the datasets involved is small enough to be stored in the memory of the single all executors.

Which join is faster in Pyspark?

Broadcast Join Working Broadcast joins are easier to run on a cluster. Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster. After the broadcast, small DataFrame Spark can perform a join without shuffling any of the data in the large DataFrame.

How do I speed up cross join IN Spark?

To make the computation faster, reduce the number of partitions of the input DataFrames before the cross join, so that the resulting cross joined DataFrame doesn't have too many partitions.


2 Answers

My default advice on how to optimize joins is:

  1. Use a broadcast join if you can (see this notebook). From your question it seems your tables are large and a broadcast join is not an option.

  2. Consider using a very large cluster (it's cheaper that you may think). $250 right now (6/2016) buys about 24 hours of 800 cores with 6Tb RAM and many SSDs on the EC2 spot instance market. When thinking about total cost of a big data solution, I find that humans tend to substantially undervalue their time.

  3. Use the same partitioner. See this question for information on co-grouped joins.

  4. If the data is huge and/or your clusters cannot grow such that even (3) above leads to OOM, use a two-pass approach. First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table.

Side note: I say "appending" above because in production I never use SaveMode.Append. It is not idempotent and that's a dangerous thing. I use SaveMode.Overwrite deep into the subtree of a partitioned table tree structure. Prior to 2.0.0 and 1.6.2 you'll have to delete _SUCCESS or metadata files or dynamic partition discovery will choke.

Hope this helps.

like image 140
Sim Avatar answered Oct 25 '22 11:10

Sim


Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.

To drastically speed up your sortMerges, write your large datasets as a Hive table with pre-bucketing and pre-sorting option (same number of partitions) instead of flat parquet dataset.

tableA
  .repartition(2200, $"A", $"B")
  .write
  .bucketBy(2200, "A", "B")
  .sortBy("A", "B")   
  .mode("overwrite")
  .format("parquet")
  .saveAsTable("my_db.table_a")


tableb
  .repartition(2200, $"A", $"B")
  .write
  .bucketBy(2200, "A", "B")
  .sortBy("A", "B")    
  .mode("overwrite")
  .format("parquet")
  .saveAsTable("my_db.table_b")

The overhead cost of writing pre-bucketed/pre-sorted table is modest compared to the benefits.

The underlying dataset will still be parquet by default, but the Hive metastore (can be Glue metastore on AWS) will contain precious information about how the table is structured. Because all possible "joinable" rows are colocated, Spark won't shuffle the tables that are pre-bucketd (big savings!) and won't sort the rows within the partition of table that are pre-sorted.

val joined = tableA.join(tableB, Seq("A", "B"))

Look at the execution plan with and without pre-bucketing.

This will not only save you a lot of time during your joins, it will make it possible to run very large joins on relatively small cluster without OOM. At Amazon, we use that in prod most of the time (there are still a few cases where it is not required).

To know more about pre-bucketing/pre-sorting:

  • https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
  • https://data-flair.training/blogs/bucketing-in-hive/
  • https://mapr.com/blog/tips-and-best-practices-to-take-advantage-of-spark-2-x/
  • https://databricks.com/session/hive-bucketing-in-apache-spark
like image 5
Boris Avatar answered Oct 25 '22 12:10

Boris