I've two dataframes, df1
with 22 million records and df2
with 2 million records. I'm doing the right join on email_address
as a key.
test_join = df2.join(df1, "email_address", how = 'right').cache()
There are very few duplicate (if any) emails in both data frames. After the join I'm trying to find the partition size of the resulting dataframe test_join
, using this code:
l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))
The result shows that the largest partition is around 100 times bigger than the average partition size. This skew in partition size is giving performance issues in post-join transformations and actions.
I know I can equally re-partition it after the join using repartion(num_partitions)
command, but my question is why am I experiencing this uneven partition result, and is there any way to avoid it in the first place.
P.S: Just to check the assumption if the problem is only with email_address hashing function, I also checked partition size on couple of other joins, I also saw the issue in a numeric key join as well.
Make a tuple of (item, index) where the index is over the entire RDD. Swap the key and the value, so now the RDD contains (index, item) Repartition to N partitions using an identity partitionFunc , moving item to partition index % N.
Let's see how to identify skew and how to identify and mitigate skew in your data. Step 1: Read data from the Table into a data frame. Step 2: Find the number of rows per partition. Here, the function spark_partition_id() returns the current partition id, by plotting the result graphically you will notice the skew.
@user6910411 you were spot on. The problem was with my data, there was some dumb convention followed to enter empty emails, which was causing this skew key issue.
Upon inspecting the enteries in the largest partition, I came to know what was going in there. I found this debugging technique quite useful, and I'm sure this could help others who are facing the same issue.
BTW, this is the function I wrote, to find the skeweness of the RDD partitions:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fast
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
and then I just pass the dataframe for which I want to check the skewness like this:
check_skewness(test_join)
and it gives me nice information about its skewness.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With