New Spark user here. I'm extracting features from many .tif images stored on AWS S3, each with identifier like 02_R4_C7. I'm using Spark 2.2.1 and hadoop 2.7.2.
I'm using all default configurations like so:
conf = SparkConf().setAppName("Feature Extraction")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
And here is the function call that this fails on after some features are successfully saved in an image id folder as part-xxxx.gz files:
features_labels_rdd.saveAsTextFile(text_rdd_direct,"org.apache.hadoop.io.compress.GzipCodec")
See error below. When I delete the feature part-xxxx.gz files that were successfully created and rerun the script, it fails at a different image and part-xxxxx.gz in a seemingly nondeterminsitic way. I make sure to remove all features before rerunning. My theory is that two workers are trying to create the same temp file and are conflicting with each other, since there are two identical error messages for the same file, but one second apart.
I'm at a loss about what to do about this, I've seen that spark lists configurations that can change how spark handles tasks but I'm not sure what would help here since I don't understand the issue I'm having. Any help is greatly appreciated!
SLF4J: Class path contains multiple SLF4J bindings.
*SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-
log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:24:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:24:41 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
Feature file of 02_R4_C7 is created
[Stage 3:=================> (6 + 14) / 20]18/06/26 19:24:58 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626192453_0003_m_000007_59
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Stage 3:=====================================> (13 + 7) / 20]18/06/26 19:24:58 ERROR executor.Executor: Exception in task 7.0 in stage 3.0 (TID 59)
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/06/26 19:24:58 ERROR scheduler.TaskSetManager: Task 7 in stage 3.0 failed 1 times; aborting job
Traceback (most recent call last):
File "run_feature_extraction_spark.py", line 88, in <module>
main(sc)
File "run_feature_extraction_spark.py", line 75, in main
features_labels_rdd.saveAsTextFile(text_rdd_direct, "org.apache.hadoop.io.compress.GzipCodec")
File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/rdd.py", line 1551, in saveAsTextFile
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o76.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 3.0 failed 1 times, most recent failure: Lost task 7.0 in stage 3.0 (TID 59, localhost, executor driver): java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz*
And when I run it again, the script makes it farther but fails with the same error with a different image folder and part-xxxx.gz file
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:37:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:37:24 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
Feature file of 02_R4_C7 is created
Feature file of 02_R4_C6 is created
Feature file of 02_R4_C5 is created
Feature file of 02_R4_C4 is created
Feature file of 02_R4_C3 is created
Feature file of 02_R4_C2 is created
Feature file of 02_R4_C1 is created
[Stage 15:==========================================> (15 + 5) / 20]18/06/26 19:38:16 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626193811_0015_m_000017_285
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/_temporary/0/_temporary/attempt_20180626193811_0015_m_000017_285/part-00017.gz; isDirectory=false; length=896020; replication=1; blocksize=67108864; modification_time=1530041897000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/part-00017.gz
It's not safe to use S3 as a direct destination of work without a "consistency layer" (Consistent EMR, or from the Apache Hadoop project itself, S3Guard), or a Special output committer designed explicitly for work with S3 (Hadoop 3.1+ "the S3A committers"). Rename is where things fail, as listing inconsistency means that the scan for files to copy may miss data, or find deleted files which it can't rename. Your stack trace looks exactly how I'd expect this to surface: job commits failing apparently at random.
Rather than go into the details, here's a video of Ryan Blue on the topic
Workaround: write to your local cluster FS then use distcp to upload to S3.
PS: for Hadoop 2.7+, switch to the s3a:// connector. It has exactly the same consistency problem without S3Guard enabled, but better performance.
The solutions in @Steve Loughran post are great. Just to add a little info to help explaining the issue.
Hadoop-2.7 uses Hadoop Commit Protocol for committing. When Spark saves result to S3, it actually saves temporary result to S3 first and make it visible by renaming it when job succeeds (reason and detail can be found in this great doc). However, S3 is an object store and does not have real "rename"; it copy the data to target object, then delete original object.
S3 is "eventually consistent", which means the delete operation could happen before copy is fully synced. When this happens, the rename would fail.
In my cases, this was only triggered in some chained jobs. I haven't seen this in simple save job.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With