Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark jobs finishes but application takes time to close

Running spark job using scala, as expected all jobs are finishing up on time , but somehow some INFO logs are printed for 20-25 minutes before job stops.

Posting few UI screenshot which can help to undestand the problem .

  1. Following is time taken by 4 stages :

Time taken by 4 stages

  1. Following is time between consecutive job ids time between consecutive job ids

I dont understand why there is so much time spent in between both job ids.

Following is my code snippet:

    val sc = new SparkContext(conf)
for (x <- 0 to 10) {
  val zz = getFilesList(lin);
  val links = zz._1
  val path = zz._2
  lin = zz._3
  val z = sc.textFile(links.mkString(",")).map(t => t.split('\t')).filter(t => t(4) == "xx" && t(6) == "x").map(t => titan2(t)).filter(t => t.length > 35).map(t => ((t(34)), (t(35), t(5), t(32), t(33))))
  val way_nodes = sc.textFile(way_source).map(t => t.split(";")).map(t => (t(0), t(1)));
  val t = z.join(way_nodes).map(t => (t._2._1._2, Array(Array(t._2._1._2, t._2._1._3, t._2._1._4, t._2._1._1, t._2._2)))).reduceByKey((t, y) => t ++ y).map(t => process(t)).flatMap(t => t).combineByKey(createTimeCombiner, timeCombiner, timeMerger).map(averagingFunction).map(t => t._1 + "," + t._2)
  t.saveAsTextFile(path)
}
sc.stop()

Some more followup : spark-1.4.1 saveAsTextFile to S3 is very slow on emr-4.0.0

like image 919
Harshit Avatar asked Jan 25 '16 20:01

Harshit


1 Answers

As I put in a comment, I recommend using the spark-csv package instead of sc.saveAsTextFile and there are no problems with writing directly to s3 using that package :)

I don't know if you use s3 or s3n, but maybe try to switch. I have experienced problems with using s3a on Spark 1.5.2 (EMR-4.2) where writes timed out all the time and switching back to s3 solved the problem, so it's worth a try.

A couple of other things that should speed up writes to s3 is to use the DirectOutputCommiter

conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter")

and disabling generation of _SUCCESS files:

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Note that disabling _SUCCESS files has to be set on the hadoop configuration of the SparkContext and not on the SparkConf.

I hope this helps.

like image 89
Glennie Helles Sindholt Avatar answered Sep 22 '22 00:09

Glennie Helles Sindholt