I'm using spark in order to calculate the pagerank of user reviews, but I keep getting Spark java.lang.StackOverflowError
when I run my code on a big dataset (40k entries). when running the code on a small number of entries it works fine though.
Entry Example :
product/productId: B00004CK40 review/userId: A39IIHQF18YGZA review/profileName: C. A. M. Salas review/helpfulness: 0/0 review/score: 4.0 review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.
The Code:
public void calculatePageRank() {
sc.clearCallSite();
sc.clearJobGroup();
JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
sc.setCheckpointDir("pagerankCheckpoint/");
JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {
@Override
public String call(String arg0) throws Exception {
String[] data = arg0.split("\t");
String movieId = data[0].split(":")[1].trim();
String userId = data[1].split(":")[1].trim();
return movieId + "\t" + userId;
}
});
JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {
@Override
public Tuple2 < String, String > call(String arg0) throws Exception {
String[] data = arg0.split("\t");
return new Tuple2 < String, String > (data[0], data[1]);
}
}).groupByKey().cache();
JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
List<Iterable<String>> cartUsersList = cartUsers.collect();
JavaPairRDD<String,String> finalCartesian = null;
int iterCounter = 0;
for(Iterable<String> out : cartUsersList){
JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
if(finalCartesian==null){
finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
}
else{
finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
if(iterCounter % 20 == 0) {
finalCartesian.checkpoint();
}
}
}
JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));
finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));
JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {
//Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
@Override
public String call (Tuple2<String, String> t) throws Exception {
return t._1 + " " + t._2;
}
});
try {
//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
JavaPageRank.calculatePageRank(userIdPairsString, 100);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sc.close();
}
Increase Thread Stack Size (-Xss) Increasing the stack size can be useful, for example, when the program involves calling a large number of methods or using lots of local variables. This will set the thread's stack size to 4 mb which should prevent the JVM from throwing a java. lang. StackOverflowError .
lang. stackoverflowerror is indicative of serious problems that an application cannot catch (e.g., stack running out of space). It is usually caused by a no terminating condition of the recursive call.
A stack overflow is a type of buffer overflow error that occurs when a computer program tries to use more memory space in the call stack than has been allocated to that stack.
Most chances to get StackOverflowError are by using [long/infinite] recursions in a recursive functions. You can avoid Function recursion by changing your application design to use stackable data objects.
I have multiple suggestions which will help you to greatly improve the performance of the code in your question.
An example is RDD.
count
— to tell you the number of lines in the file, the file needs to be read. So if you write RDD.count
, at this point the file will be read, the lines will be counted, and the count will be returned.What if you call RDD.
count
again? The same thing: the file will be read and counted again. So what does RDD.cache
do? Now, if you run RDD.count
the first time, the file will be loaded, cached, and counted. If you call RDD.count
a second time, the operation will use the cache. It will just take the data from the cache and count the lines, no recomputing.
Read more about caching here.
In your code sample you are not reusing anything that you've cached. So you may remove the .cache
from there.
rddFileData
, rddMovieData
and rddPairReviewData
steps so that it happens in one go. Get rid of .collect
since that brings the results back to the driver and maybe the actual reason for your error.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With