Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark groupBy OutOfMemory woes

Tags:

apache-spark

I'm doing a simple groupBy on a fairly small dataset (80 files in HDFS, few gigs in total). I'm running Spark on 8 low-memory machines in a yarn cluster, i.e. something along the lines of:

spark-submit ... --master yarn-client --num-executors 8 --executor-memory 3000m --executor-cores 1

The dataset consists of strings of length 500-2000.

I'm trying to do a simple groupByKey (see below), but it fails with a java.lang.OutOfMemoryError: GC overhead limit exceeded exception

val keyvals = sc.newAPIHadoopFile("hdfs://...")
  .map( someobj.produceKeyValTuple )
keyvals.groupByKey().count()

I can count the group sizes using reduceByKey without problems, ensuring myself the problem isn't caused by a single excessively large group, nor by an excessive amount of groups :

keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => a+b).collect().foreach(println)
// produces:
//  (key1,139368)
//  (key2,35335)
//  (key3,392744)
//  ...
//  (key13,197941)

I've tried reformatting, reshuffling and increasing the groupBy level of parallelism:

keyvals.groupByKey(24).count // fails
keyvals.groupByKey(3000).count // fails
keyvals.coalesce(24, true).groupByKey(24).count // fails
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
keyvals.coalesce(24, false).groupByKey(24).count // fails
keyvals.coalesce(3000, false).groupByKey(3000).count // fails

I've tried playing around with spark.default.parallelism, and increasing spark.shuffle.memoryFraction to 0.8 while lowering spark.storage.memoryFraction to 0.1

The failing stage (count) will fail on task 2999 of 3000.

I can't seem to find anything that suggests that groupBy shouldn't just spill to disk instead of keeping things in memory, but I just can't get it to work right, even on fairly small datasets. This should obviosuly not be the case, and I must be doing something wrong, but I have no idea where to start debugging this!

like image 733
jkgeyti Avatar asked Aug 05 '14 10:08

jkgeyti


1 Answers

Patrick Wendell shed some light on the details of the groupBy operator on the mailing list. The takeaway message is the following:

Within a partition things will spill [...] This spilling can only occur across keys at the moment. Spilling cannot occur within a key at present. [...] Spilling within one key for GroupBy's is likely to end up in the next release of Spark, Spark 1.2. [...] If the goal is literally to just write out to disk all the values associated with each group, and the values associated with a single group are larger than fit in memory, this cannot be accomplished right now with the groupBy operator.

He further suggests a work-around:

The best way to work around this depends a bit on what you are trying to do with the data down stream. Typically approaches involve sub-dividing any very large groups, for instance, appending a hashed value in a small range (1-10) to large keys. Then your downstream code has to deal with aggregating partial values for each group. If your goal is just to lay each group out sequentially on disk on one big file, you can call sortByKey with a hashed suffix as well. The sort functions are externalized in Spark 1.1 (which is in pre-release).

like image 102
jkgeyti Avatar answered Sep 27 '22 19:09

jkgeyti