EDIT 2022/02/18: I returned to this problem after a few years, and I believe my new solution below is substantially more performant than the current highest-voted solution.
Suppose I have a DataFrame with a column partition_id
:
n_partitions = 2
df = spark.sparkContext.parallelize([
[1, 'A'],
[1, 'B'],
[2, 'A'],
[2, 'C']
]).toDF(('partition_id', 'val'))
How can I repartition the DataFrame to guarantee that each value of partition_id
goes to a separate partition, and that there are exactly as many actual partitions as there are distinct values of partition_id
?
If I do a hash partition, i.e. df.repartition(n_partitions, 'partition_id')
, that guarantees the right number of partitions, but some partitions may be empty and others may contain multiple values of partition_id
due to hash collisions.
There is no such option with Python and DataFrame
API. Partitioning API in Dataset
is not plugable and supports only predefined range and hash partitioning schemes.
You can convert data to RDD
, partition with custom partitioner, and read convert back to DataFrame
:
from pyspark.sql.functions import col, struct, spark_partition_id
mapping = {k: i for i, k in enumerate(
df.select("partition_id").distinct().rdd.flatMap(lambda x: x).collect()
)}
result = (df
.select("partition_id", struct([c for c in df.columns]))
.rdd.partitionBy(len(mapping), lambda k: mapping[k])
.values()
.toDF(df.schema))
result.withColumn("actual_partition_id", spark_partition_id()).show()
# +------------+---+-------------------+
# |partition_id|val|actual_partition_id|
# +------------+---+-------------------+
# | 1| A| 0|
# | 1| B| 0|
# | 2| A| 1|
# | 2| C| 1|
# +------------+---+-------------------+
Please remember that this only creates specific distribution of data and doesn't set partitioner that can be used by Catalyst optimizer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With