I have a HUGE table, where my spark job keeps crashing. I want to repartition it. I have two variables (id, time) where I need to ensure that all rows with a given id will be parittioned to the same worker. But I have hundreds of millions of unique IDs. I want pyspark to evenly spread the data, but respecting that for a given ID all rows should be on one worker. Can I simply do :
df.repartition("id")
The documentation, seems to suggest so. But I am wondering whether spark will now partition the job into hundreds of millions of subsets and only send one subset (i.e. data of one id) to the each worker at a time. This would be very inefficient of course.
I am using Spark 2.4.0-cdh6.2.1
Let's use explain to see what spark does when you call repartition:
>>> spark.range(20).repartition("id").explain()
== Physical Plan ==
Exchange hashpartitioning(id#0L, 200)
+- *(1) Range (0, 20, step=1, splits=8)
Exchange hashpartitioning(id#0L, 200) means shuffling the data into 200 partitions. The partition a row ends up in is determined by doing id.hashCode() % 200. If you do not have a skew in your data, the distribution should be pretty even. 200 is the default value of spark.sql.shuffle.partitions that determines how many partitions are generated after a shuffle. To change that value to say 400, you can either change the value of the config be doing spark.conf.set("spark.sql.shuffle.partitions", 400) or do repartition(400, "id"). Indeed, if you have a lot of data, 200 might not be enough.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With