Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partition data for efficient joining for Spark dataframe/dataset

I need to join many DataFrames together based on some shared key columns. For a key-value RDD, one can specify a partitioner so that data points with same key are shuffled to same executor so joining is more efficient (if one has shuffle related operations before the join). Can the same thing can be done on Spark DataFrames or DataSets?

like image 228
Rainfield Avatar asked Jan 09 '18 02:01

Rainfield


People also ask

How many partitions should I use Spark?

The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.

How do you optimize the join in Spark?

Step 1 : Shuffling: The data from the Join tables are partitioned based on the Join key. It does shuffle the data across partitions to have the same Join keys of the record assigned to the corresponding partitions. Step 2- Hash Join: A classic single node Hash Join algorithm is performed for the data on each partition.

When should I use partition in Spark?

Spark/PySpark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.

How do I speed up cross join IN Spark?

To make the computation faster, reduce the number of partitions of the input DataFrames before the cross join, so that the resulting cross joined DataFrame doesn't have too many partitions.


1 Answers

You can repartition a DataFrame after loading it if you know you'll be joining it multiple times

val users = spark.read.load("/path/to/users").repartition('userId)

val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition

val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned

So it'll shuffle the data once and then reuse the shuffle files when joining subsequent times.

However, if you know you'll be repeatedly shuffling data on certain keys, your best bet would be to save the data as bucketed tables. This will write the data out already pre-hash partitioned, so when you read the tables in and join them you avoid the shuffle. You can do so as follows:

// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")

val users = spark.read.table("users")
val addresses = spark.read.table("addresses")

val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned

In order to avoid a shuffle, the tables have to use the same bucketing (e.g. same number of buckets and joining on the bucket columns).

like image 142
Silvio Avatar answered Sep 22 '22 12:09

Silvio