Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Spark job fails to write output?

Tags:

apache-spark

Setup:

I have a Spark job running on a distributed Spark Cluster with 10 nodes. I am doing some text file processing on HDFS. The job runs fine, until the last step: saving output as text files.

Problem:

I get the following stacktrace:

15/04/07 11:32:11 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:235, took 1.377335791 s
Exception in thread "main" java.io.IOException: Failed to rename RawLocalFileStatus{path=file:/home/ds_myuser/tmp/myapp/out/_temporary/0/task_201504071132_0016_m_000003/part-00003; isDirectory=false; length=2494; replication=1; blocksize=33554432; modification_time=1428427931000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/home/ds_myuser/tmp/myapp/out/part-00003
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
    at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
    at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:792)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1162)
    at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:440)
    at org.apache.spark.api.java.JavaPairRDD.saveAsTextFile(JavaPairRDD.scala:45)
    at com.somecompany.analysis.myapp.control.Main.calculateRow(Main.java:235)
    at com.somecompany.analysis.myapp.control.Main.calculatemyapp(Main.java:127)
    at com.somecompany.analysis.myapp.control.Main.main(Main.java:103)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Also, this is how I am saving to file in my Java code:

result.saveAsTextFile("/home/myuser/tmp/myapp/out");

Also, sometimes I get 1 part file in the output directory, sometimes none.

Is it because I am trying to save to local file system and there is race-condition because all executors are trying to write to same location? But the part file names are different, so I guess that should not be an issue.

Any help is much appreciated.

EDIT 1:

Noticed one more thing. Strangely enough, some of the temporary files are owned by "root" which I cannot delete:

[myuser@myserver ~]$ rm -rf tmp/myapp/
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/.part-00001.crc': Permission denied
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/part-00001': Permission denied

EDIT 2:

As suggested by Marius Soutier, I tried using coalesce, and also tired repartition. With these changes the job succeeds, but in the output directory I see only _SUCCESS file, no part-xxxxx. Also, I am doing

result.count()

just before coalesce or repartition, which prints 260, so there is some final output. But it is not getting converted to part files.

EDIT 3:

Here is my code which writes the file and it is in the driver class:

    System.out.println("Final No. of Output Lines: " + result.count());
    result.coalesce(1, true).saveAsTextFile("file:///home/myuser/tmp3");

And Here are the logs after the count is printed:

Final No. of Output Lines: 260
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/04/09 11:30:07 INFO spark.SparkContext: Starting job: saveAsTextFile at Main.java:284
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 174 bytes
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Registering RDD 23 (coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Got job 9 (saveAsTextFile at Main.java:284) with 1 output partitions (allowLocal=false)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Final stage: Stage 21(saveAsTextFile at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Missing parents: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284), which has no missing parents
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(22392) called with curMem=132730821, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17 stored as values in memory (estimated size 21.9 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(11900) called with curMem=132753213, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17_piece0 stored as bytes in memory (estimated size 11.6 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode111.mydomain.com:34468 (size: 11.6 KB, free: 5.2 GB)
15/04/09 11:30:07 INFO storage.BlockManagerMaster: Updated info of block broadcast_17_piece0
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.TaskSchedulerImpl: Adding task set 26.0 with 4 tasks
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 26.0 (TID 36, mynode117.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 26.0 (TID 37, mynode112.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 26.0 (TID 38, mynode115.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 26.0 (TID 39, mynode119.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode115.mydomain.com:51126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode117.mydomain.com:33052 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to [email protected]:34724
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to [email protected]:35651
15/04/09 11:30:07 INFO network.ConnectionManager: Accepted connection from [mynode112.mydomain.com/10.211.26.212:52476]
15/04/09 11:30:07 INFO network.SendingConnection: Initiating connection to [mynode112.mydomain.com/10.211.26.212:56453]
15/04/09 11:30:07 INFO network.SendingConnection: Connected to [mynode112.mydomain.com/10.211.26.212:56453], 1 messages pending
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode119.mydomain.com:39126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode112.mydomain.com:56453 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 26.0 (TID 36) in 356 ms on mynode117.mydomain.com (1/4)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 26.0 (TID 38) in 362 ms on mynode115.mydomain.com (2/4)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to [email protected]:42604
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to [email protected]:46239
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 26.0 (TID 37) in 796 ms on mynode112.mydomain.com (3/4)
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 26.0 (TID 39) in 829 ms on mynode119.mydomain.com (4/4)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 26 (coalesce at Main.java:284) finished in 0.835 s
15/04/09 11:30:08 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 26.0, whose tasks have all completed, from pool 
15/04/09 11:30:08 INFO scheduler.DAGScheduler: running: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: waiting: Set(Stage 21)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: failed: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Missing parents for Stage 21: List()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284), which is now runnable
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(53664) called with curMem=132765113, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18 stored as values in memory (estimated size 52.4 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(19192) called with curMem=132818777, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 18.7 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode111.mydomain.com:34468 (size: 18.7 KB, free: 5.2 GB)
15/04/09 11:30:08 INFO storage.BlockManagerMaster: Updated info of block broadcast_18_piece0
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284)
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 1 tasks
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 40, mynode112.mydomain.com, ANY, 1353 bytes)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode112.mydomain.com:56453 (size: 18.7 KB, free: 2.1 GB)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 4 to [email protected]:46239
15/04/09 11:30:08 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 4 is 199 bytes
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 40) in 441 ms on mynode112.mydomain.com (1/1)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 21 (saveAsTextFile at Main.java:284) finished in 0.447 s
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool 
15/04/09 11:30:08 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:284, took 1.381897276 s
[myuser@mynode111 ~]$ ls tmp3/
_SUCCESS
[myuser@mynode111 ~]$ 

A side note, I am doing the processing on HDFS and expecting the final output file after saveAsTextFile() on the local FS from where I am launching the app. I hope Spark is not writing it somewhere else (local FS of some node).

A quick update:

I tried to write to HDFS, instead of Local FS and it works fine:

result.coalesce(1, true).saveAsTextFile("hdfs://mynode20.mydomain.com:8020/user/myuser/tmp");

Output:

[myuser@mynode111 ~]$ hadoop fs -ls /user/myuser/tmp
Found 2 items
-rw-r--r--   3 myuser myuser          0 2015-04-09 11:53 /user/myuser/tmp/_SUCCESS
-rw-r--r--   3 myuser myuser      12470 2015-04-09 11:53 /user/myuser/tmp/part-00000
[myuser@mynode111 ~]$ 
like image 768
Bhushan Avatar asked Apr 07 '15 17:04

Bhushan


2 Answers

I had same issue, it turned out that my Spark worker was running as root user and my job was running as another user, so when calling saveAsTextFile, Spark worker first save the data to a temporary location on disk as root user, then the Spark job, which was running as different user, tries to move the temporary data owned by root to a final location, will have a permission issue.

like image 101
elgoog Avatar answered Oct 20 '22 23:10

elgoog


I met the same problem before, then realize if you choose running on standalone, then the driver will be run by user, and executor processes are run by root. The only change you need is:

Firstly, sbt package to create jar file, notice that you may better run sbt package by user not by root. I have tried to sbt package by root (sudo), then the assembly jar file will be created in somewhere else.

After you have a assembly jar file, then doing spark submit by "sudo".

sudo /opt/spark-2.0/bin/spark-submit \
   --class ...
   --master ..
   ...
like image 29
Tianwen Chu Avatar answered Oct 21 '22 00:10

Tianwen Chu