I am joining two RDDs rddA
and rddB
.
rddA
has 100 partitions and rddB
has 500 partitions.
I am trying to understand the mechanics of the join
operation. By default, regardless of the order of the join I end up with the same partition structure; i.e. rddA.join(rddB
) and rddB.join(rddA)
yields the same number of partitions, and by observation it uses the smaller partition size, 100. I am aware that I can increase the partition size by using rddA.join(rddB,500)
, but I am more interested in what takes place under the hood and why the lower size is chosen. From observation, even if I re-partition the small rdd
, its partitioning will still be used; does Spark do any heuristic analysis regarding the key size?
Another problem I have is the level of skewness I get. My smaller partition ends up with 3,314 entries and the larger ends up with 1,139,207 out of a total size 599,911,729(keys). Both RDDs are using the default partitioner, so how is the data shuffle decided?
I vaguely recall reading that if one rdd
has a partitioner set, then it is that partitioner that will be used. Is this the case? Is it "recommended" to do this?
Finally, note that both of my rdd
s are relatively big (~90GB) so a broadcast join would not help. Instead, any way to provide some insights to the join
operation would probably be the way to go.
PS. Any details on the mechanics left and right join would be an added bonus :)
The entire resource allocation and the tracking of the jobs and tasks are performed by the cluster manager. As soon as you do a Spark submit, your user program and other configuration mentioned are copied onto all the available nodes in the cluster. So that the program becomes the local read on all the worker nodes.
As a result, data rows can move between worker nodes when their source partition and the target partition reside on a different machine. Spark doesn't move data between nodes randomly. Shuffling is a time-consuming operation, so it happens only when there is no other option.
Since union doesn't move any data around, it is considered as an efficient method. If rdd1 has 10 partitions and rdd2 has 20 partitions then rdd1. union(rdd2) will have 30 partitions: the partitions of the two RDDs put after each other. This is simply a bookkeeping change, here no shuffling is involved.
Although I have not yet managed to explain how partitioning is derived, I did find out how data are shuffled (which was my initial problem). A join has a few side-effects:
Shuffling/Partitioning: Spark will Hash partition 'RDD' keys and move/distribute among 'Workers'. Each set of values for a given key(e.g. 5), will end up in a single 'Worker'/JVM. This mean that if your 'join' has a 1..N relationship and N is heavily skewed, you will end up with skewed partitions and JVM heaps (i.e. one 'Partition' might have Max(N) and the other Min(N)). The only way to avoid this is to use a 'Broadcast' if possible or endure this behaviour. As you data will initially be evenly distributed, the amount of shuffling will be dependant on the key hash.
Re-partitioning: Following a "skewed" join, calling 'repartition' seems to evenly re-distribute data among partitions. So this is a good thing to do, if you have unavoidable skewness issues. Note though that this transformation will trigger a heavy shuffle, but following operations will be much faster. The downside to this though is uncontrollable Object creation (See below)
Object creation/Heap pollution: You managed to join your data think that repartitioning would be a good idea to re-balance your cluster, but for some reason, 'repartition' triggers an 'OOME'. What happens is that the originally joined data re-uses joined Objects. When you trigger 'repartition' or any other 'Action' that involved shuffling e.g. an extra join or 'groupBy' (followed by an 'Action'), data gets serialized so you lose Object re-use. Once Objects are de-serialized they are now new instances. Also note than during serialization the re-use is lost, so the suffle will be quite heavy. So, in my case, a 1..1000000 join (where 1 is my 'heavy' object), will fail following any action that triggers a shuffle.
Workarounds/Debug:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With