I am trying to write out a large partitioned dataset to disk with Spark and the partitionBy
algorithm is struggling with both of the approaches I've tried.
The partitions are heavily skewed - some of the partitions are massive and others are tiny.
Problem #1:
When I use repartition before repartitionBy
, Spark writes all partitions as a single file, even the huge ones
val df = spark.read.parquet("some_data_lake")
df
.repartition('some_col).write.partitionBy("some_col")
.parquet("partitioned_lake")
This takes forever to execute because Spark isn't writing the big partitions in parallel. If one of the partitions has 1TB of data, Spark will try to write the entire 1TB of data as a single file.
Problem #2:
When I don't use repartition
, Spark writes out way too many files.
This code will write out an insane number of files.
df.write.partitionBy("some_col").parquet("partitioned_lake")
I ran this on a tiny 8 GB data subset and Spark wrote out 85,000+ files!
When I tried running this on a production data set, one partition that has 1.3 GB of data was written out as 3,100 files.
What I'd like
I'd like for each partition to get written out as 1 GB files. So a partition that has 7 GB of data will get written out as 7 files and a partition that has 0.3 GB of data will get written out as a single file.
What is my best path forward?
Repartition works by creating new partitions and doing a full shuffle to move data around. Results in more or less equal sized partitions. Since a full shuffle takes place, repartition is less performant than coalesce.
PySpark partitionBy() is used to partition based on column values while writing DataFrame to Disk/File system. When you write DataFrame to Disk by calling partitionBy() Pyspark splits the records based on the partition column and stores each partition data into a sub-directory.
We need to change/rewrite our ETL logic to perform a left join with the not_null table and execute a union with the null column as ultimately null keys won't participate in the join. Hence, we can avoid a shuffle and the GC Pause issue on the table by following this technique with large null values. select ord.
Spark DataFrame coalesce() is used only to decrease the number of partitions. This is an optimized or improved version of repartition() where the movement of the data across the partitions is fewer using coalesce.
The simplest solution is to add one or more columns to repartition
and explicitly set the number of partitions.
val numPartitions = ???
df.repartition(numPartitions, $"some_col", $"some_other_col")
.write.partitionBy("some_col")
.parquet("partitioned_lake")
where:
numPartitions
- should be an upper bound (actual number can be lower) of the desired number of files written to a partition directory.$"some_other_col"
(and optional additional columns) should have high cardinality and be independent of the $"some_column
(there should be functional dependency between these two, and shouldn't be highly correlated).
If data doesn't contain such column you can use o.a.s.sql.functions.rand
.
import org.apache.spark.sql.functions.rand
df.repartition(numPartitions, $"some_col", rand)
.write.partitionBy("some_col")
.parquet("partitioned_lake")
I'd like for each partition to get written out as 1 GB files. So a partition that has 7 GB of data will get written out as 7 files and a partition that has 0.3 GB of data will get written out as a single file.
The currently accepted answer is probably good enough most of the time, but doesn't quite deliver on the request that the 0.3 GB partition get written out to a single file. Instead, it will write out numPartitions
files for every output partition directory, including the 0.3 GB partition.
What you're looking for is a way to dynamically scale the number of output files by the size of the data partition. To do that, we'll build on 10465355's approach of using rand()
to control the behavior of repartition()
, and scale the range of rand()
based on the number of files we want for that partition.
It's difficult to control partitioning behavior by output file size, so instead we'll control it using the approximate number of rows we want per output file.
I'll provide a demonstration in Python, but the approach is basically the same in Scala.
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
spark = SparkSession.builder.getOrCreate()
skewed_data = (
spark.createDataFrame(
[(1,)] * 100 + [(2,)] * 10 + [(3,), (4,), (5,)],
schema=['id'],
)
)
partition_by_columns = ['id']
desired_rows_per_output_file = 10
partition_count = skewed_data.groupBy(partition_by_columns).count()
partition_balanced_data = (
skewed_data
.join(partition_count, on=partition_by_columns)
.withColumn(
'repartition_seed',
(
rand() * partition_count['count'] / desired_rows_per_output_file
).cast('int')
)
.repartition(*partition_by_columns, 'repartition_seed')
)
This approach will balance the size of the output files, no matter how skewed the partition sizes are. Every data partition will get the number of files it needs so that each output file has roughly the requested number of rows.
A prerequisite of this approach is calculating the size of each partition, which you can see in partition_count
. It's unavoidable if you really want to dynamically scale the number of output files per partition.
To demonstrate this is doing the right thing, let's inspect the partition contents:
from pyspark.sql.functions import spark_partition_id
(
skewed_data
.groupBy('id')
.count()
.orderBy('id')
.show()
)
(
partition_balanced_data
.select(
*partition_by_columns,
spark_partition_id().alias('partition_id'),
)
.groupBy(*partition_by_columns, 'partition_id')
.count()
.orderBy(*partition_by_columns, 'partition_id')
.show(30)
)
Here's what the output looks like:
+---+-----+
| id|count|
+---+-----+
| 1| 100|
| 2| 10|
| 3| 1|
| 4| 1|
| 5| 1|
+---+-----+
+---+------------+-----+
| id|partition_id|count|
+---+------------+-----+
| 1| 7| 9|
| 1| 49| 6|
| 1| 53| 14|
| 1| 117| 12|
| 1| 126| 10|
| 1| 136| 11|
| 1| 147| 15|
| 1| 161| 7|
| 1| 177| 7|
| 1| 181| 9|
| 2| 85| 10|
| 3| 76| 1|
| 4| 197| 1|
| 5| 10| 1|
+---+------------+-----+
As desired, each output file has roughly 10 rows. id=1
gets 10 partitions, id=2
gets 1 partition, and id={3,4,5}
each get 1 partition.
This solution balances the output file sizes, regardless of data skew, and without limiting parallelism by relying on maxRecordsPerFile
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With