Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark when union a lot of RDD throws stack overflow error

When I use "++" to combine a lot of RDDs, I got error stack over flow error.

Spark version 1.3.1 Environment: yarn-client. --driver-memory 8G

The number of RDDs is more than 4000. Each RDD is read from a text file with size of 1 GB.

It is generated in this way

val collection = (for (
  path <- files
) yield sc.textFile(path)).reduce(_ union _)

It works fine when files has small size. And there is the error

The error repeats itself. I guess it is a recursion function which is called too many time?

 Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
  .....
like image 538
worldterminator Avatar asked May 29 '15 06:05

worldterminator


People also ask

Is RDD outdated?

In RDD data is available at all times. However RDD are slow and hard to code hence outdated. It has been replaced by concept of DataFrame and Dataset.

Why is RDD collect () not recommended in most cases in production?

The next reason to not use RDD is the API it provides. Most of the operations like counting, grouping, etc. are pretty straightforward and easy-to-use APIs functions are built-in. But when it comes to operations like aggregation or finding averages, it becomes really hard to code using RDD.

Is RDD parallel?

RDD operations are executed in parallel on each partition. Tasks are executed on the Worker Nodes where the data is stored.

Why is RDD slow?

RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. It provides an easy API to perform aggregation operations. It performs aggregation faster than both RDDs and Datasets. Dataset is faster than RDDs but a bit slower than Dataframes.


1 Answers

Use SparkContext.union(...) instead to union many RDDs at once.

You don't want to do it one at a time like that since RDD.union() creates a new step in the lineage (an extra set of stack frames on any computation) for each RDD, whereas SparkContext.union() makes it all at once. This will insure not getting a stack-overflow error.

like image 181
Sean Owen Avatar answered Nov 18 '22 10:11

Sean Owen