Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Find Each Partition Size for RDD

What's the best way of finding each partition size for a given RDD. I'm trying to debug a skewed Partition issue, I've tried this:

l = builder.rdd.glom().map(len).collect()  # get length of each partition
print('Min Parition Size: ',min(l),'. Max Parition Size: ', max(l),'. Avg Parition Size: ', sum(l)/len(l),'. Total Partitions: ', len(l))

It works fine for small RDDs, but for bigger RDDs, it is giving OOM error. My idea is that glom() is causing this to happen. But anyway, just wanted to know if there is any better way to do it?

like image 949
anwartheravian Avatar asked Dec 09 '16 20:12

anwartheravian


People also ask

How many partitions does an RDD have?

As already mentioned above, one partition is created for each block of the file in HDFS which is of size 64MB. However, when creating a RDD a second argument can be passed that defines the number of partitions to be created for an RDD. The above line of code will create an RDD named textFile with 5 partitions.

How do I determine partition size?

Each partition size should be smaller than 200 MB to gain optimized performance. Usually, the number of partitions should be 1x to 4x of the number of cores you have to gain optimized performance (which means create a cluster that matches your data scale is also important).

How do I check my PySpark partition size?

PySpark (Spark with Python) Similarly, in PySpark you can get the current length/size of partitions by running getNumPartitions() of RDD class, so to use with DataFrame first you need to convert to RDD.


2 Answers

Use:

builder.rdd.mapPartitions(lambda it: [sum(1 for _ in it)])
like image 145
user6022341 Avatar answered Nov 13 '22 09:11

user6022341


While the answer by @LostInOverflow works great. I've found another way to find the size as well as index of each partition, using the code below. Thanks to this awesome post.

Here is the code:

l = test_join.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()

and then you can get the max and min size partitions using this code:

min(l,key=lambda item:item[1])
max(l,key=lambda item:item[1])

Finding the key of the skewed partition, we can further debug the content of the that partition, if needed.

like image 30
anwartheravian Avatar answered Nov 13 '22 09:11

anwartheravian