There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs.
That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption
I have two questions:
Is there anyway to control the cogroup process more "precisely" e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible
If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on
It is not that bad. It largelly depends on the granularity of your partitioning. Cogroup will first shuffle by key, in disk, to distinct executor nodes. For each key, yes, the entire set of all elements with that key, for both RDDs, will be loaded in RAM and given to you. But not all keys need to be in RAM at any given time, so unless your data is really skewed you will not suffer for it much.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With