Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to resolve Apache Spark StackOverflowError after multiple unions

I have a Spark Scala program which uses a REST API to get data batch by batch, and once all the data is retrieved I operate on them.

Current Program:

  • For each batch, create RDD and merge it with the previous RDD created using the previous API call rdd.union(currentRdd).

  • Operate on final RDD

A simple program to reproduce the issue:

    def main(args: Array[String]) = {
     val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
     val sc = new SparkContext(conf)
     val limit = 1000;
     var rdd = sc.emptyRDD[Int]
     for (x <- 1 to limit) {
       val currentRdd = sc.parallelize(x to x + 3)
       rdd = rdd.union(currentRdd)
     }
     println(rdd.sum())
   }

Problem: - When number of batches are high the program throws a StackOverflowError : Exception in thread "main" java.lang.StackOverflowError at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply

I assume, that when the number of batches increases the RDD dependency graph becomes really complex and throwing the error.

What is the best way to resolve this problem?

like image 679
mjlowky Avatar asked Dec 06 '25 14:12

mjlowky


1 Answers

There is already SparkContext.union that knows how to properly compute a union of multiple RDDs:

val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = sc.union(rdds)

Alternatively, you could try using this helper function to avoid the creation of a long chain of unions:

val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = balancedReduce(rdds)(_ union _)

The reason why it should work is essentially the same as in the linked answer: O(n) chain of unions blows the stack, O(log(n))-high binary tree of unions doesn't.

like image 178
Andrey Tyukin Avatar answered Dec 09 '25 03:12

Andrey Tyukin