Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hoes does Spark schedule a join?

I am joining two RDDs rddA and rddB.

rddA has 100 partitions and rddB has 500 partitions.

I am trying to understand the mechanics of the join operation. By default, regardless of the order of the join I end up with the same partition structure; i.e. rddA.join(rddB) and rddB.join(rddA) yields the same number of partitions, and by observation it uses the smaller partition size, 100. I am aware that I can increase the partition size by using rddA.join(rddB,500), but I am more interested in what takes place under the hood and why the lower size is chosen. From observation, even if I re-partition the small rdd, its partitioning will still be used; does Spark do any heuristic analysis regarding the key size?

Another problem I have is the level of skewness I get. My smaller partition ends up with 3,314 entries and the larger ends up with 1,139,207 out of a total size 599,911,729(keys). Both RDDs are using the default partitioner, so how is the data shuffle decided? I vaguely recall reading that if one rdd has a partitioner set, then it is that partitioner that will be used. Is this the case? Is it "recommended" to do this?

Finally, note that both of my rdds are relatively big (~90GB) so a broadcast join would not help. Instead, any way to provide some insights to the join operation would probably be the way to go.

PS. Any details on the mechanics left and right join would be an added bonus :)

like image 674
Ioannis Deligiannis Avatar asked May 23 '15 11:05

Ioannis Deligiannis


People also ask

How does Apache spark work internally?

The entire resource allocation and the tracking of the jobs and tasks are performed by the cluster manager. As soon as you do a Spark submit, your user program and other configuration mentioned are copied onto all the available nodes in the cluster. So that the program becomes the local read on all the worker nodes.

Does Spark require shuffling?

As a result, data rows can move between worker nodes when their source partition and the target partition reside on a different machine. Spark doesn't move data between nodes randomly. Shuffling is a time-consuming operation, so it happens only when there is no other option.

Is Union a shuffle operation in Spark?

Since union doesn't move any data around, it is considered as an efficient method. If rdd1 has 10 partitions and rdd2 has 20 partitions then rdd1. union(rdd2) will have 30 partitions: the partitions of the two RDDs put after each other. This is simply a bookkeeping change, here no shuffling is involved.


1 Answers

Although I have not yet managed to explain how partitioning is derived, I did find out how data are shuffled (which was my initial problem). A join has a few side-effects:

Shuffling/Partitioning: Spark will Hash partition 'RDD' keys and move/distribute among 'Workers'. Each set of values for a given key(e.g. 5), will end up in a single 'Worker'/JVM. This mean that if your 'join' has a 1..N relationship and N is heavily skewed, you will end up with skewed partitions and JVM heaps (i.e. one 'Partition' might have Max(N) and the other Min(N)). The only way to avoid this is to use a 'Broadcast' if possible or endure this behaviour. As you data will initially be evenly distributed, the amount of shuffling will be dependant on the key hash.

Re-partitioning: Following a "skewed" join, calling 'repartition' seems to evenly re-distribute data among partitions. So this is a good thing to do, if you have unavoidable skewness issues. Note though that this transformation will trigger a heavy shuffle, but following operations will be much faster. The downside to this though is uncontrollable Object creation (See below)

Object creation/Heap pollution: You managed to join your data think that repartitioning would be a good idea to re-balance your cluster, but for some reason, 'repartition' triggers an 'OOME'. What happens is that the originally joined data re-uses joined Objects. When you trigger 'repartition' or any other 'Action' that involved shuffling e.g. an extra join or 'groupBy' (followed by an 'Action'), data gets serialized so you lose Object re-use. Once Objects are de-serialized they are now new instances. Also note than during serialization the re-use is lost, so the suffle will be quite heavy. So, in my case, a 1..1000000 join (where 1 is my 'heavy' object), will fail following any action that triggers a shuffle.

Workarounds/Debug:

  1. I used 'mapPartitionsWithIndex' to debug partition sizes by returning a single item 'Iterable>' with the count of each partitions. This is very useful as you can see the effect of 'repartition' and the state of your partitions after an 'Action'.
  2. You can use 'countByKeyApprox' or 'countByKey' on your join RDDs to get a feel of the cardinality and then applied the join in two steps. Use a 'Broadcast' for you high cardinality keys and a 'join' for the low cardinality keys. Wrapping these operations in a 'rdd.cache()' & 'rdd.unpersist()' block will speed this process significantly. Though this might complicate your code a little, it will provide much better performance especially if you do subsequent operations. Also note, that if you use the 'Broadcast' in every 'map', to do a lookup, you will also significantly reduce shuffling size.
  3. Call 'repartition' of other operations that affect the number of partitions can be very useful, but be aware that a (randomly) large number of partitions will cause more skenewess, as your large sets for given key will create large partitions, but the other partitions will have a small size or 0. Creating a debug method to get the size of partitioning will help you pick a good size.
like image 104
Ioannis Deligiannis Avatar answered Sep 25 '22 15:09

Ioannis Deligiannis