I'm working through these two concepts right now and would like some clarity. From working through the command line, I've been trying to identify the differences and when a developer would use repartition vs partitionBy.
Here is some sample code:
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c',1), ('ef',5)])
rdd1 = rdd.repartition(4)
rdd2 = rdd.partitionBy(4)
rdd1.glom().collect()
[[('b', 1), ('ef', 5)], [], [], [('a', 1), ('a', 2), ('b', 3), ('c', 1)]]
rdd2.glom().collect()
[[('a', 1), ('a', 2)], [], [('c', 1)], [('b', 1), ('b', 3), ('ef', 5)]]
I took a look at the implementation of both, and the only difference I've noticed for the most part is that partitionBy can take a partitioning function, or using the portable_hash by default. So in partitionBy, all the same keys should be in the same partition. In repartition, I would expect the values to be distributed more evenly over the partitions, but this isnt the case.
Given this, why would anyone ever use repartition? I suppose the only time I could see it being used is if I'm not working with PairRDD, or I have large data skew?
Is there something that I'm missing, or could someone shed light from a different angle for me?
repartition() is used for specifying the number of partitions considering the number of cores and the amount of data you have. partitionBy() is most importantly used for making shuffling functions more efficient, such as reduceByKey(), join(), cogroup() etc..
Spark repartition() vs coalesce() – repartition() is used to increase or decrease the RDD, DataFrame, Dataset partitions whereas the coalesce() is used to only decrease the number of partitions in an efficient way.
PySpark partitionBy() is used to partition based on column values while writing DataFrame to Disk/File system. When you write DataFrame to Disk by calling partitionBy() Pyspark splits the records based on the partition column and stores each partition data into a sub-directory.
One difference I get is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased. If the partitions are spread across multiple machines and coalesce() is run, how can it avoid data movement?
repartition()
is used for specifying the number of partitions considering the number of cores and the amount of data you have.
partitionBy()
is used for making shuffling functions more efficient, such as reduceByKey()
, join()
, cogroup()
etc.. It is only beneficial in cases where a RDD is used for multiple times, so it is usually followed by persist()
.
Differences between the two in action:
pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))
pairs.partitionBy(3).glom().collect()
[[(3, 3), (6, 6), (6, 6)],
[(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)],
[(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]
pairs.repartition(3).glom().collect()
[[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)],
[(1, 1), (4, 4), (6, 6), (4, 4)],
[(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]
repartition
already exists in RDDs, and does not handle partitioning by key (or by any other criterion except Ordering). Now PairRDDs add the notion of keys and subsequently add another method that allows to partition by that key.
So yes, if your data is keyed, you should absolutely partition by that key, which in many cases is the point of using a PairRDD in the first place (for joins, reduceByKey, and so on).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With