I test the spark default PageRank example and set the iteration to 1024, then it throws stackoverflowerror. I also met this problem in my other program.How can i solve it.
object SparkPageRank {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
System.exit(1)
}
var iters = args(2).toInt
val ctx = new SparkContext(args(0), "PageRank",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = ctx.textFile(args(1), 1)
val links = lines.map{ s => val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
System.exit(0)
}
}
I post the error here.
[spark-akka.actor.default-dispatcher-15] ERROR LocalActorRefProvider(akka://spark) - guardian failed, shutting down system
java.lang.StackOverflowError
at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:312)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316)
at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:326)
It is because these transformations in the for-loop produce very long dependencies in your rdd. When you try to run your spark job, the recursively visiting on your rdd would cause stackoverflow error.
To solve this problem, you can use checkpoint()
on your rdd. cache()
would not help you evaluate your rdd immediately.
So you should call cache()
and checkpoint()
on your intermediate rdd after certain iterations and manually evaluate it to clear its dependencies.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With