I'm running a Spark job to aggregate data. I have a custom data structure called a Profile, which basically contains a mutable.HashMap[Zone, Double]
. I want to merge all profiles that share a given key (a UUID), with the following code:
def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()
Curiously, Spark fails with the following error:
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
The obvious solution is to increment "spark.driver.maxResultSize", but two things puzzle me.
take()
or collect()
), but I'm not taking ANYTHING to the driver, just reading from HDFS, aggregating, saving back to HDFS. Does anyone know why I'm getting this error?
Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.
That serialized output must be more than 1024.0 MB and less than 1024.1 MB.
Check added Apache Spark code snippet, It's very interesting and very rare to get this error. :)
Here totalResultSize > maxResultSize
both are Long types and in holds the value in bytes. But msg
holds rounded value from Utils.bytesToString()
.
//TaskSetManager.scala
def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
s"(${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)
false
} else {
true
}
}
Apache Spark 1.3 - source
//Utils.scala
def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10
val (value, unit) = {
if (size >= 2*TB) {
(size.asInstanceOf[Double] / TB, "TB")
} else if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB")
} else if (size >= 2*MB) {
(size.asInstanceOf[Double] / MB, "MB")
} else if (size >= 2*KB) {
(size.asInstanceOf[Double] / KB, "KB")
} else {
(size.asInstanceOf[Double], "B")
}
}
"%.1f %s".formatLocal(Locale.US, value, unit)
}
Apache Spark 1.3 - source
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With