Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: coalesce very slow even the output data is very small

I have the following code in Spark:

myData.filter(t => t.getMyEnum() == null)
      .map(t => t.toString)
      .saveAsTextFile("myOutput")

There are 2000+ files in the myOutput folder, but only a few t.getMyEnum() == null, so there are only very few output records. Since I don't want to search just a few outputs in 2000+ output files, I tried to combine the output using coalesce like below:

myData.filter(t => t.getMyEnum() == null)
      .map(t => t.toString)
      .coalesce(1, false)
      .saveAsTextFile("myOutput")

Then the job becomes EXTREMELY SLOW! I am wondering why it is so slow? There was just a few output records scattering in 2000+ partitions? Is there a better way to solve this problem?

like image 641
Edamame Avatar asked Jun 25 '15 17:06

Edamame


People also ask

Why is Spark slow for small data?

When Spark is loading data to object storage systems like HDFS, S3 etc, it can result in large number of small files. This is mainly because Spark is a parallel processing system and data loading is done through multiple tasks where each task can load into multiple partitions. For example, if spark. sql.

Why is Spark show so slow?

Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.

Which is faster repartition or coalesce?

Repartition works by creating new partitions and doing a full shuffle to move data around. Results in more or less equal sized partitions. Since a full shuffle takes place, repartition is less performant than coalesce.

Is coalesce expensive Spark?

One important point to note is, Spark repartition() and coalesce() are very expensive operations as they shuffle the data across many partitions hence try to minimize repartition as much as possible.


1 Answers

if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

Note: With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large. Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner.

So try by passing the true to coalesce function. i.e.

myData.filter(_.getMyEnum == null)
      .map(_.toString)
      .coalesce(1, shuffle = true)
      .saveAsTextFile("myOutput")
like image 119
Zia Kiyani Avatar answered Sep 21 '22 19:09

Zia Kiyani