I'm loading tens of thousands of gzipped files from s3 for my spark job. This results in some partitions being very small (10s of records) and some very large (10000s of records). The sizes of the partitions are pretty well distributed among nodes so each executor seems to be working on the same amount of data in aggregate. So I'm not really sure if I even have a problem.
How would I know if it's worth repartitioning or coalescing the RDD? Will either of these be able to balance the partitions without shuffling data? Also, the RDD will not be reused, just mapped over and then joined to another RDD.
Interesting question. With respect to coalescing versus repartitioning, coalescing would definitely be better as it does not trigger a full shuffle. In general, coalescing is recommended when you have sparse data across partitions (say, after a filter). I think this is a similar scenario, but straight from the initial load. However, I really think coalescing would probably be worth it for you due to what you do with the RDD after your initial load.
When data is shuffled when you apply your join to your loaded RDD, Spark consults the shuffle manager to see which implementation of shuffle it should use (configured through spark.shuffle.manager
). There are two implementations for the shuffle manager: hash
(default for version < 1.2.0) and sort
(default >= 1.2.0).
If the hash
implementation is used, each input partition will create output files to send to the corresponding reducers where the join will take place. This can create a huge blow up of files which can be mitigated by setting spark.shuffle.consolidateFiles
to true but ultimately can leave you with a pretty slow join if there are a lot of partitions as input. If this implementation is used, coalescing is definitely worth it as a large number of input partitions can yield an unwieldy number of files to reduce out of.
If the sort
implementation is used, there is only one output file per partition (whew!) and the file is indexed such that the reducers can grab their keys from their respective indices. However, with many input partitions, Spark will still be reading from all of the input partitions to gather every possible key. If this implementation is used, coalescing might still be worth it since applying this seek and read to every partition could also be costly.
If you do end up using coalescing, the number of partitions you want to coalesce to is something you will probably have to tune since coalescing will be a step within your execution plan. However, this step could potentially save you a very costly join. Also, as a side note, this post is very helpful in explaining the implementation behind shuffling.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With