Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How To Get Local Spark on AWS to Write to S3

I have installed Spark 2.4.3 with Hadoop 3.2 on an AWS EC2 instance. I’ve been using spark (mainly pyspark) in local mode with great success. It is nice to be able to spin up something small and then resize it when I need power, and do it all very quickly. When I really need to scale I can switch to EMR and go to lunch. It all works smoothly apart from one issue: I can’t get the local spark to reliably write to S3 (I've been using local EBS space). This is clearly something to do with all the issues outlined in the docs about S3’s limitations as a file system. However, using the latest hadoop, my reading of the docs is that should be able to get it working.

Note that I'm aware of this other post, which asks a related question; there is some guidance here, but no solution that I can see. How to use new Hadoop parquet magic commiter to custom S3 server with Spark

I have the following settings (set in various places), following my best understanding of the documentation here: https://hadoop.apache.org/docs/r3.2.1/hadoop-aws/tools/hadoop-aws/index.html

fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem  
fs.s3a.committer.name: directory   
fs.s3a.committer.magic.enabled: false  
fs.s3a.committer.threads: 8 
fs.s3a.committer.staging.tmp.path: /cache/staging  
fs.s3a.committer.staging.unique-filenames: true  
fs.s3a.committer.staging.conflict-mode: fail  
fs.s3a.committer.staging.abort.pending.uploads: true  
mapreduce.outputcommitter.factory.scheme.s3a: org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory  
fs.s3a.connection.maximum: 200  
fs.s3a.fast.upload: true  

A relevant point is that I’m saving using parquet. I see that there was some problem with the Parquet saving previously, but I don’t see this mentioned in the latest docs. Maybe this is the problem?

In any case, here is the error I’m getting, which seems indicative of the kind of error S3 gives when trying to rename the temporary folder. Is there some array of correct settings that will make this go away?

java.io.IOException: Failed to rename S3AFileStatus{path=s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/_temporary/0/_temporary/attempt_20190910022011_0004_m_000118_248/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet; isDirectory=false; length=185052; replication=1; blocksize=33554432; modification_time=1568082036000; access_time=0; owner=brett; group=brett; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=FALSE to s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:473)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:486)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:597)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:560)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
like image 431
brettc Avatar asked Oct 22 '19 01:10

brettc


1 Answers

I helped @brettc with his configuration and we found out the correct one to set.

Under $SPARK_HOME/conf/spark-defaults.conf

# Enable S3 file system to be recognise
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

# Parameters to use new commiters
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.magic.enabled false
spark.hadoop.fs.s3a.commiter.staging.conflict-mode replace
spark.hadoop.fs.s3a.committer.staging.unique-filenames true
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads true
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class     org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

If you look at the last 2 configurations lines above you see that you need org.apache.spark.internal.io library which contains PathOutputCommitProtocol and BindingParquetOutputCommitter classes. To do so you have to download spark-hadoop-cloud jar here (in our case we took version 2.3.2.3.1.0.6-1) and place it under $SPARK_HOME/jars/.

You can easily verify that you are using the new committer by creating a parquet file. The _SUCCESS file should contains a json like the one below:

{
  "name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
  "timestamp" : 1574729145842,
  "date" : "Tue Nov 26 00:45:45 UTC 2019",
  "hostname" : "<hostname>",
  "committer" : "directory",
  "description" : "Task committer attempt_20191125234709_0000_m_000000_0",
  "metrics" : { [...] },
  "diagnostics" : { [...] },
  "filenames" : [...]
}
like image 190
Gaylord Cherencey Avatar answered Nov 07 '22 09:11

Gaylord Cherencey