I have the following code in Spark:
myData.filter(t => t.getMyEnum() == null)
.map(t => t.toString)
.saveAsTextFile("myOutput")
There are 2000+ files in the myOutput folder, but only a few t.getMyEnum() == null, so there are only very few output records. Since I don't want to search just a few outputs in 2000+ output files, I tried to combine the output using coalesce like below:
myData.filter(t => t.getMyEnum() == null)
.map(t => t.toString)
.coalesce(1, false)
.saveAsTextFile("myOutput")
Then the job becomes EXTREMELY SLOW! I am wondering why it is so slow? There was just a few output records scattering in 2000+ partitions? Is there a better way to solve this problem?
When Spark is loading data to object storage systems like HDFS, S3 etc, it can result in large number of small files. This is mainly because Spark is a parallel processing system and data loading is done through multiple tasks where each task can load into multiple partitions. For example, if spark. sql.
Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.
Repartition works by creating new partitions and doing a full shuffle to move data around. Results in more or less equal sized partitions. Since a full shuffle takes place, repartition is less performant than coalesce.
One important point to note is, Spark repartition() and coalesce() are very expensive operations as they shuffle the data across many partitions hence try to minimize repartition as much as possible.
if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
Note: With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large. Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner.
So try by passing the true to coalesce
function. i.e.
myData.filter(_.getMyEnum == null)
.map(_.toString)
.coalesce(1, shuffle = true)
.saveAsTextFile("myOutput")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With