Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache spark job failed immediately without retry, setting maxFailures doesn't work

I was testing a web crawling/scrapping program on Apache Spark locally on my computer.

the program use a few RDD transformations that takes a volatile function that sporadically fails. (The function's purpose is to transform URL links into web pages, sometimes the headless browser it invoked just blackout or got overloaded - I can't avoid that)

I heard that Apache Spark has powerful failover and retrying feature, any unsuccessful transformation or lost data can be recalculated from scratch from whatever resource it can find (sounds like magic right?) so I didn't put any failover or try-catch in my code.

This is my spark configuration:

val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough

Unfortunately the job failed after the majority of stages and individual tasks succeeded. The latest log in console shows:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...

Looks like Spark just give up cowardly after failed once. How do I configure it properly to make it more tenacious?

(my program can be downloaded from https://github.com/tribbloid/spookystuff, sorry for the scarce and disorganized code/documentation, I just start it for a few days)

ADD: if you want to try it yourself, The following code can demonstrate this problem:

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
  val x = java.lang.Math.random()
  if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
  x
}.reduce(_ + _)
sc.stop()
println("finished")
}

It should be noted that the same IllegalStateException got retried for 32 times in this post: Apache Spark Throws java.lang.IllegalStateException: unread block data

like image 404
tribbloid Avatar asked Dec 04 '22 06:12

tribbloid


1 Answers

I know it's a very old question, but I had exactly same problem and came across this question while looking for a solution.

There are 3 master URL formats to submit a spark application in a local mode:

  • local - one thread (no parallelism), no retries
  • local[K] (or local[*]) - uses K (or number of cores) worker threads and sets task.maxFailures to 1 (see here)

  • local[K, F] (or local[*, F]) - sets the task.maxFailures=F, and this is what we were after.

Consult Spark documentation for details.

like image 80
botchniaque Avatar answered Dec 28 '22 15:12

botchniaque