I have an application with around 10 flat files each worth more than 200MM+ records in them. Business logic involves in joining all of them sequentially.
my environment: 1 master - 3 slaves (for testing i have assigned a 1GB memory to each node)
Most of the code just does the below for each join
RDD1 = sc.textFile(file1).mapToPair(..)
RDD2 = sc.textFile(file2).mapToPair(..)
join = RDD1.join(RDD2).map(peopleObject)
Any suggestion for tuning , like repartitioning, parallelize ..? if so, any best practices in coming up with good number for repartitioning?
with the current config the job takes more than an hour and i see the shuffle write for almost every file is > 3GB
In practice, with large datasets (5, 100G+ each), I have seen that the join works best when you co-partition all the RDDs involved in a series of join before you start joining them.
RDD1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(2048))
RDD2 = sc.textFile(file2).mapToPair(..).partitionBy(new HashPartitioner(2048))
.
.
.
RDDN = sc.textFile(fileN).mapToPair(..).partitionBy(new HashPartitioner(2048))
//start joins
RDD1.join(RDD2)...join(RDDN)
If we are always joining one RDD (say rdd1) with all the others, the idea is to partition that RDD and then persist it.
Here is sudo-Scala implementation (can easily be converted to Python or Java):
val rdd1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(200)).cache()
Up to here we have rdd1 to be hashed into 200 partitions. The first time it will get evaluated it will be persisted (cached).
Now let's read two more rdds and join them.
val rdd2 = sc.textFile(file2).mapToPair(..)
val join1 = rdd1.join(rdd2).map(peopleObject)
val rdd3 = sc.textFile(file3).mapToPair(..)
val join2 = rdd1.join(rdd3).map(peopleObject)
Note that for the remanning RDDs we do not partition them nor do we cache them.
Spark will see that rdd1 is already hashed partition and it will use the same partitions for all remaining joins. So rdd2 and rdd3 will shuffle their keys to the same locations where the keys of rdd1 are located.
To make it more clear, let's assume that we don't do the partition and we use the same code shown by the question; Each time we do a join both rdds will be shuffled. This means that if we have N joins to rdd1, the non partition version will shuffle rdd1 N times. The partitioned approach will shuffle rdd1 just once.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With