I'm doing a simple groupBy on a fairly small dataset (80 files in HDFS, few gigs in total). I'm running Spark on 8 low-memory machines in a yarn cluster, i.e. something along the lines of:
spark-submit ... --master yarn-client --num-executors 8 --executor-memory 3000m --executor-cores 1
The dataset consists of strings of length 500-2000.
I'm trying to do a simple groupByKey
(see below), but it fails with a java.lang.OutOfMemoryError: GC overhead limit exceeded
exception
val keyvals = sc.newAPIHadoopFile("hdfs://...")
.map( someobj.produceKeyValTuple )
keyvals.groupByKey().count()
I can count the group sizes using reduceByKey
without problems, ensuring myself the problem isn't caused by a single excessively large group, nor by an excessive amount of groups :
keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => a+b).collect().foreach(println)
// produces:
// (key1,139368)
// (key2,35335)
// (key3,392744)
// ...
// (key13,197941)
I've tried reformatting, reshuffling and increasing the groupBy level of parallelism:
keyvals.groupByKey(24).count // fails
keyvals.groupByKey(3000).count // fails
keyvals.coalesce(24, true).groupByKey(24).count // fails
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
keyvals.coalesce(24, false).groupByKey(24).count // fails
keyvals.coalesce(3000, false).groupByKey(3000).count // fails
I've tried playing around with spark.default.parallelism
, and increasing spark.shuffle.memoryFraction
to 0.8
while lowering spark.storage.memoryFraction
to 0.1
The failing stage (count) will fail on task 2999 of 3000.
I can't seem to find anything that suggests that groupBy shouldn't just spill to disk instead of keeping things in memory, but I just can't get it to work right, even on fairly small datasets. This should obviosuly not be the case, and I must be doing something wrong, but I have no idea where to start debugging this!
Patrick Wendell shed some light on the details of the groupBy operator on the mailing list. The takeaway message is the following:
Within a partition things will spill [...] This spilling can only occur across keys at the moment. Spilling cannot occur within a key at present. [...] Spilling within one key for GroupBy's is likely to end up in the next release of Spark, Spark 1.2. [...] If the goal is literally to just write out to disk all the values associated with each group, and the values associated with a single group are larger than fit in memory, this cannot be accomplished right now with the groupBy operator.
He further suggests a work-around:
The best way to work around this depends a bit on what you are trying to do with the data down stream. Typically approaches involve sub-dividing any very large groups, for instance, appending a hashed value in a small range (1-10) to large keys. Then your downstream code has to deal with aggregating partial values for each group. If your goal is just to lay each group out sequentially on disk on one big file, you can call
sortByKey
with a hashed suffix as well. The sort functions are externalized in Spark 1.1 (which is in pre-release).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With