Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark write Parquet to S3 the last task takes forever

I'm writing a parquet file from DataFrame to S3. When I look at the Spark UI, I can see all tasks but 1 completed swiftly of the writing stage (e.g. 199/200). This last task appears to take forever to complete, and very often, it fails due to exceeding executor memory limit.

I'd like to know what is happening in this last task. How to optimize it? Thanks.

like image 267
user2680514 Avatar asked Aug 04 '15 18:08

user2680514


3 Answers

I have tried Glemmie Helles Sindholt solution and works very well. Here is the code:

path = 's3://...'
n = 2 # number of repartitions, try 2 to test
spark_df = spark_df.repartition(n)
spark_df.write.mode("overwrite").parquet(path)
like image 197
bcosta12 Avatar answered Sep 28 '22 08:09

bcosta12


As others have noted, data skew is likely at play.

Besides that, I noticed that your task count is 200.

The configuration parameter spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregations.

200 is the default for this setting, but generally it is far from an optimal value.

For small data, 200 could be overkill and you would waste time in the overhead of multiple partitions.

For large data, 200 can result in large partitions, which should be broken down into more, smaller partitions.

The really rough rules of thumb are: - have 2-3x number of partitions to cpu's. - Or ~128MB.

2GB's is the max partition size. If you are hovering just below 2000 partitions, Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000[1]

private[spark] object MapStatus {

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...

You can try playing with this parameter at runtime:

spark.conf.set("spark.sql.shuffle.partitions", "300")

[1]What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

like image 20
Ryan Avatar answered Sep 28 '22 10:09

Ryan


It sounds like you have a data skew. You can fix this by calling repartition on your DataFrame before writing to S3.

like image 24
Glennie Helles Sindholt Avatar answered Sep 28 '22 09:09

Glennie Helles Sindholt