Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark: repartition vs partitionBy

I'm working through these two concepts right now and would like some clarity. From working through the command line, I've been trying to identify the differences and when a developer would use repartition vs partitionBy.

Here is some sample code:

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c',1), ('ef',5)])
rdd1 = rdd.repartition(4)
rdd2 = rdd.partitionBy(4)

rdd1.glom().collect()
[[('b', 1), ('ef', 5)], [], [], [('a', 1), ('a', 2), ('b', 3), ('c', 1)]]

rdd2.glom().collect()
[[('a', 1), ('a', 2)], [], [('c', 1)], [('b', 1), ('b', 3), ('ef', 5)]]

I took a look at the implementation of both, and the only difference I've noticed for the most part is that partitionBy can take a partitioning function, or using the portable_hash by default. So in partitionBy, all the same keys should be in the same partition. In repartition, I would expect the values to be distributed more evenly over the partitions, but this isnt the case.

Given this, why would anyone ever use repartition? I suppose the only time I could see it being used is if I'm not working with PairRDD, or I have large data skew?

Is there something that I'm missing, or could someone shed light from a different angle for me?

like image 437
Joe Widen Avatar asked Nov 20 '15 16:11

Joe Widen


People also ask

What is difference between repartition and partitionBy in Spark?

repartition() is used for specifying the number of partitions considering the number of cores and the amount of data you have. partitionBy() is most importantly used for making shuffling functions more efficient, such as reduceByKey(), join(), cogroup() etc..

What is the difference between repartition and coalesce in PySpark?

Spark repartition() vs coalesce() – repartition() is used to increase or decrease the RDD, DataFrame, Dataset partitions whereas the coalesce() is used to only decrease the number of partitions in an efficient way.

What does partitionBy do in PySpark?

PySpark partitionBy() is used to partition based on column values while writing DataFrame to Disk/File system. When you write DataFrame to Disk by calling partitionBy() Pyspark splits the records based on the partition column and stores each partition data into a sub-directory.

Which is better repartition or coalesce?

One difference I get is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased. If the partitions are spread across multiple machines and coalesce() is run, how can it avoid data movement?


2 Answers

repartition() is used for specifying the number of partitions considering the number of cores and the amount of data you have.

partitionBy() is used for making shuffling functions more efficient, such as reduceByKey(), join(), cogroup() etc.. It is only beneficial in cases where a RDD is used for multiple times, so it is usually followed by persist().

Differences between the two in action:

pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))

pairs.partitionBy(3).glom().collect()
[[(3, 3), (6, 6), (6, 6)],
 [(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)],
 [(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]

pairs.repartition(3).glom().collect()
[[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)],
 [(1, 1), (4, 4), (6, 6), (4, 4)],
 [(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]
like image 52
Hui Guo Avatar answered Oct 20 '22 01:10

Hui Guo


repartition already exists in RDDs, and does not handle partitioning by key (or by any other criterion except Ordering). Now PairRDDs add the notion of keys and subsequently add another method that allows to partition by that key.

So yes, if your data is keyed, you should absolutely partition by that key, which in many cases is the point of using a PairRDD in the first place (for joins, reduceByKey, and so on).

like image 24
Marius Soutier Avatar answered Oct 20 '22 01:10

Marius Soutier