I'm writing to see if anyone knows how to speed up S3 write times from Spark running in EMR?
My Spark Job takes over 4 hours to complete, however the cluster is only under load during the first 1.5 hours.
I was curious into what Spark was doing all this time. I looked at the logs and I found many s3 mv
commands, one for each file. Then taking a look directly at S3 I see all my files are in a _temporary directory.
Secondary, I'm concerned with my cluster cost, it appears I need to buy 2 hours of compute for this specific task. However, I end up buying unto 5 hours. I'm curious if EMR AutoScaling can help with cost in this situation.
Some articles discuss changing the file output committer algorithm but I've had little success with that.
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
Writing to the local HDFS is quick. I'm curious if issuing a hadoop command to copy the data to S3 would be faster?
The client's storage device or system might also be a source of latency. Read or write operations to the client's storage device that experience elevated latency can impact the performance of the download or upload to Amazon S3.
HDFS and the EMR File System (EMRFS), which uses Amazon S3, are both compatible with Amazon EMR, but they're not interchangeable.
Using spark. write. parquet() function we can write Spark DataFrame in Parquet file to Amazon S3. The parquet() function is provided in DataFrameWriter class.
Each S3 operation is an API request with significant latency — tens to hundreds of milliseconds, which adds up to pretty much forever if you have millions of objects and try to work with them one at a time.
What you are seeing is a problem with outputcommitter and s3. the commit job applies fs.rename
on the _temporary folder and since S3 does not support rename it means that a single request is now copying and deleting all the files from _temporary to its final destination..
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
only works with hadoop version > 2.7. what it does is to copy each file from _temporary on commit task and not commit job so it is distributed and works pretty fast.
If you use older version of hadoop I would use Spark 1.6 and use:
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
*note that it does not work with specualtion turned on or writing in append mode
**also note that it is deprecated in Spark 2.0 (replaced by algorithm.version=2)
BTW in my team we actually write with Spark to HDFS and use DISTCP jobs (specifically s3-dist-cp) in production to copy the files to S3 but this is done for several other reasons (consistency, fault tolerance) so it is not necessary.. you can write to S3 pretty fast using what I suggested.
I had similar use case where I used spark to write to s3 and had performance issue. Primary reason was spark was creating lot of zero byte part files and replacing temp files to actual file name was slowing down the write process. Tried below approach as work around
Write output of spark to HDFS and used Hive to write to s3. Performance was much better as hive was creating less number of part files. Problem I had is(also had same issue when using spark), delete action on Policy was not provided in prod env because of security reasons. S3 bucket was kms encrypted in my case.
Write spark output to HDFS and Copied hdfs files to local and used aws s3 copy to push data to s3. Had second best results with this approach. Created ticket with Amazon and they suggested to go with this one.
Use s3 dist cp to copy files from HDFS to S3. This was working with no issues, but not performant
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With