Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partitioning of Data Frame in Pyspark using Custom Partitioner

Looking for some info on using custom partitioner in Pyspark. I have a dataframe holding country data for various countries. So if I do repartition on country column, it will distribute my data into n partitions and keeping similar country data to specific partitions. This is creating a skew partition data when I see using glom() method.

Some countries like USA and CHN has huge amount of data in particular dataframe. I want to repartition my dataframe such that if the countries are USA and CHN then it will further split into some 10 partitions else keep the partitions same for other countries like IND, THA, AUS etc. Can we extend partitioner class in Pyspark code.

I have read this in below link that we can extend scala partitioner class in scala Spark application and can modify the partitioner class to use custom logic to repartition our data on base of requirements. Like the one I have.. please help to achieve this solution in Pyspark.. See the link below What is an efficient way to partition by column but maintain a fixed partition count?


I am using Spark version 2.3.0.2 and below is my Dataframe structure:

datadf= spark.sql("""
    SELECT    
        ID_NUMBER ,SENDER_NAME ,SENDER_ADDRESS ,REGION_CODE ,COUNTRY_CODE
    from udb.sometable
""");

The incoming data has data for six countries, like AUS, IND, THA, RUS, CHN and USA. CHN and USA has skew data.

so if I do repartition on COUNTRY_CODE, two partitions contains a lot data whereas others are fine. I checked this using glom() method.

newdf = datadf.repartition("COUNTRY_CODE")

from pyspark.sql import SparkSession
from pyspark.sql import  HiveContext, DataFrameWriter, DataFrame

newDF = datadf.repartitionByRange(3,"COUNTRY_CODE","USA")

I was trying repartition my data into 3 more partitions for country USA and CHN only and would like to keep the other countries data into single partition.

This is what I am expecting 
AUS- one partition
IND- one partition
THA- one partition
RUS- one partition
CHN- three partition
USA- three partition

Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1182, in getattr "'%s' object has no attribute '%s'" % (self.class.name, name)) AttributeError: 'DataFrame' object has no attribute 'repartitionByRange'

like image 899
vikrant rana Avatar asked Oct 13 '18 07:10

vikrant rana


People also ask

How do I partition a DataFrame in PySpark?

PySpark partitionBy() is used to partition based on column values while writing DataFrame to Disk/File system. When you write DataFrame to Disk by calling partitionBy() Pyspark splits the records based on the partition column and stores each partition data into a sub-directory.

What is custom partitioning in Spark?

Custom partitioning lets you alter the size and number of partitions as per your application's needs. Here, you can define which key should enter which partition. One should provide an explicit partitioner by calling partitionBy method on a paired RDD.

How can you create an RDD with specific partitioning?

The loaded rdd is partitioned by default partitioner: hash code. To specify custom partitioner, use can check rdd. partitionBy(), provided with your own partitioner.

How do I create a partition table in PySpark?

Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations. Partition on disk: While writing the PySpark DataFrame back to disk, you can choose how to partition the data based on columns using partitionBy() of pyspark. sql. DataFrameWriter .


1 Answers

Try something like this with hashing:

newDf = oldDf.repartition(N, $"col1", $"coln")

or for ranging approach:

newDF = oldDF.repartitionByRange(N, $"col1", $"coln")

There is no custom partitioning for DF's just yet.

In your case I would go for hashing, but there are no guarantees.

But if your data is skew you may need some extra work, like 2 columns for partitioning being the simplest approach.

E.g. an existing or new column - in this case a column that applies a grouping against a given country, e.g. 1 .. N, and the partition on two cols.

For countries with many grouping you get N synthetic sub divisions; for others with low cardinality, only with 1 such group number. Not too hard. Both partitioning can take more than 1 col.

In my view uniform number filling of partitions takes a lot of effort and not really attainable, but a next best approach as in this here can suffice well enough. Amounts to custom partitioning to an extent.

Otherwise, using .withColumn on a DF you can simulate custom partitioning with those rules and filling of a new DF column and then apply the repartitionByRange. Also not so hard.

like image 137
thebluephantom Avatar answered Oct 25 '22 22:10

thebluephantom