Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Setting spark.speculation in Spark 2.1.0 while writing to s3

I am running a large Spark 2.1.0 that ends with writing results to s3. It runs on a 30 node cluster and for the most part works fine. However, occasionally I have to stop the job and run it again because a single node gets stuck while writing even after all the computation is done. I am wondering whether I can mitigate this issue by turning speculation on. I read in another post that this may be harmful and lead to duplicate results or data corruption. Can anyone advise ? I was also advised to use the hadoop default committer by specifying the following setting in my spark-defaults.conf. I am running Spark standalone.

 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

Any clarification on this issue would be greatly appreciated.

like image 938
femibyte Avatar asked Sep 23 '17 03:09

femibyte


1 Answers

Update: If you use AWS Elastic MapReduce, clusters with version >= 5.19 can now safely use speculative execution, but your Spark job can still fail part-way through and leave incomplete results.

The partial data from your incomplete results are queryable if you are directly scanning AWS S3 which can lead to incorrect results for downstream jobs, so you need a strategy to deal with that!

If your are running Spark 2.3.0 or greater I would recommend writing new partitions to a deterministic location using SaveMode.Overwrite and retrying on failure, this will avoid duplicate or corrupt data in your output.

If you are using SaveMode.Append then retrying a Spark job will produce duplicate data in your output.

The recommended approach:

df.write
  .mode(SaveMode.Overwrite)
  .partitionBy("date")
  .parquet("s3://myBucket/path/to/table.parquet")

Then on successful writing of a partition, atomically register it to a metastore such as Hive, and query Hive as your source of truth, not S3 directly.

Eg.

ALTER TABLE my_table ADD PARTITION (date='2019-01-01') location 's3://myBucket/path/to/table.parquet/date=2019-01-01'

If your Spark job fails and you are using SaveMode.Overwrite it is then it is always safe to retry because the data has not been made available to metastore queries, and you are only overwriting data in the failed partition.

Note: In order to only override specific partitions rather than the entire dataset you need to configure:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

which is only available from Spark 2.3.0.

https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/ https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html

You might also want to consider the Iceberg project as an alternative to a Hive / Glue metastore as it matures. https://github.com/apache/incubator-iceberg

A background on why this is necessary and for non-AWS users

Running with Spark speculation on when committing to an object store is usually a VERY bad idea, depending on what is looking at that data downstream and your consistency model.

Ryan Blue from Netflix has an excellent (and pretty funny) talk which explains exactly why: https://www.youtube.com/watch?v=BgHrff5yAQo

Judging by the OP's description I suspect they are writing Parquet.

The TL;dr version is that in S3, a rename operation is actually a copy and delete under the hood and this has consistency implications. Usually in Spark, output data is written to a temp file location and renamed when the calculation is complete. This means if speculative execution is on then multiple executors can be working on the same result and then the one that finishes first 'wins' by renaming temp file to a final result and the other task is cancelled. This rename operation happens on a single task to ensure that only one speculative task wins, which is not a problem on HDFS since a rename is a cheap metadata operation, a few thousand or million of them takes very little time.

But when using S3, a rename is not an atomic operation, it is actually a copy which takes time. Therefore you can get into a situation whereby you have to copy a whole bunch of files in S3 a second time for the rename, in series, and this is a synchronous operation which is causing your slowdown. If your executor has multiple cores, you may actually have one task clobber the results of another, which should be ok in theory because one file ends up winning, but you're not in control of what is happening at that point.

The issue with this is, what happens if the final rename task fails? You end up with some of your files committed to S3 and not all of them, which means partial/duplicate data and lots of problems downstream depending on your application.

While I don't like it, the prevailing wisdom presently is to write locally to HDFS, then upload the data with a tool like S3Distcp.

Have a look at HADOOP-13786. Steve Loughran is the go to guy for this issue.

If you don't want to wait Ryan Blue has a repo "rdblue/s3committer" which allows you to fix this for all outputs except parquet files, but it looks like a bit of work to integrate and subclass correctly.

Update: HADOOP-13786 has now been fixed and released into Hadoop 3.1 libraries. At present Steven Loughran is working on getting a fix based on Hadoop 3.1 libs merged into apache/spark, (SPARK-23977) however latest according to the ticket comment thread is that the fix will not be merged before Spark 2.4 is released so we may be waiting a bit longer for this to become mainstream.

Update v2: Note: You can halve the window of time in which the final output partition rename task may fail by setting mapreduce.fileoutputcommitter.algorithm.version to 2 in your Hadoop config, since the original output commit mechanism actually performed two renames.

like image 54
Wade Jensen Avatar answered Sep 20 '22 10:09

Wade Jensen