Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark RDD - avoiding shuffle - Does partitioning help to process huge files?

I have an application with around 10 flat files each worth more than 200MM+ records in them. Business logic involves in joining all of them sequentially.

my environment: 1 master - 3 slaves (for testing i have assigned a 1GB memory to each node)

Most of the code just does the below for each join

 RDD1 = sc.textFile(file1).mapToPair(..)

 RDD2 = sc.textFile(file2).mapToPair(..) 

 join = RDD1.join(RDD2).map(peopleObject)

Any suggestion for tuning , like repartitioning, parallelize ..? if so, any best practices in coming up with good number for repartitioning?

with the current config the job takes more than an hour and i see the shuffle write for almost every file is > 3GB

like image 859
sve Avatar asked May 29 '16 18:05

sve


2 Answers

In practice, with large datasets (5, 100G+ each), I have seen that the join works best when you co-partition all the RDDs involved in a series of join before you start joining them.

RDD1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(2048))

RDD2 = sc.textFile(file2).mapToPair(..).partitionBy(new HashPartitioner(2048)) 
.
.
.
RDDN = sc.textFile(fileN).mapToPair(..).partitionBy(new HashPartitioner(2048)) 

//start joins

RDD1.join(RDD2)...join(RDDN)


Side note: I refer to this kind of a join as a tree join (each RDD used once). The rationale is presented in the following beautiful pic taken from the Spark-UI:

enter image description here

like image 50
axiom Avatar answered Nov 11 '22 11:11

axiom


If we are always joining one RDD (say rdd1) with all the others, the idea is to partition that RDD and then persist it.

Here is sudo-Scala implementation (can easily be converted to Python or Java):

val rdd1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(200)).cache()

Up to here we have rdd1 to be hashed into 200 partitions. The first time it will get evaluated it will be persisted (cached).

Now let's read two more rdds and join them.

val rdd2 = sc.textFile(file2).mapToPair(..) 
val join1 = rdd1.join(rdd2).map(peopleObject)
val rdd3 = sc.textFile(file3).mapToPair(..) 
val join2 = rdd1.join(rdd3).map(peopleObject)

Note that for the remanning RDDs we do not partition them nor do we cache them.

Spark will see that rdd1 is already hashed partition and it will use the same partitions for all remaining joins. So rdd2 and rdd3 will shuffle their keys to the same locations where the keys of rdd1 are located.

To make it more clear, let's assume that we don't do the partition and we use the same code shown by the question; Each time we do a join both rdds will be shuffled. This means that if we have N joins to rdd1, the non partition version will shuffle rdd1 N times. The partitioned approach will shuffle rdd1 just once.

like image 43
marios Avatar answered Nov 11 '22 13:11

marios