Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: PageRank example when iteration too large throws stackoverflowError

I test the spark default PageRank example and set the iteration to 1024, then it throws stackoverflowerror. I also met this problem in my other program.How can i solve it.

object SparkPageRank {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
      System.exit(1)
    }
    var iters = args(2).toInt
    val ctx = new SparkContext(args(0), "PageRank",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
    val lines = ctx.textFile(args(1), 1)
    val links = lines.map{ s => val parts = s.split("\\s+")
    (parts(0), parts(1))
    }.distinct().groupByKey().cache()
    var ranks = links.mapValues(v => 1.0)

    for (i <- 1 to iters) {
        val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
        val size = urls.size
        urls.map(url => (url, rank / size))
      }
    ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
    }

    val output = ranks.collect()
    output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))

    System.exit(0)
  }
}

I post the error here.

    [spark-akka.actor.default-dispatcher-15] ERROR LocalActorRefProvider(akka://spark) - guardian failed, shutting down system
java.lang.StackOverflowError
    at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
    at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
    at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
    at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
    at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:312)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
    at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:326)
like image 560
user3405300 Avatar asked Mar 11 '14 09:03

user3405300


1 Answers

It is because these transformations in the for-loop produce very long dependencies in your rdd. When you try to run your spark job, the recursively visiting on your rdd would cause stackoverflow error.

To solve this problem, you can use checkpoint() on your rdd. cache() would not help you evaluate your rdd immediately.

So you should call cache() and checkpoint() on your intermediate rdd after certain iterations and manually evaluate it to clear its dependencies.

like image 142
viirya Avatar answered Oct 23 '22 22:10

viirya