Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Working Around Performance & Memory Issues with spark-sql GROUP BY

Consider the following example of running a GROUP BY with a relatively large number of aggregations and a relatively large number of groups:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._

val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt

case class Data(A: Long = (math.random*num_groups).toLong)

val table = (1 to num_rows).map(i => Data()).toDF

val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")

// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")

The input of this job is just 8MB, the output around 2.4GB and I am running this on a cluster with three worker machines with 61GB memory each. The result: All workers crash with OutOfMemory exceptions. Even with lower values for num_columns the job becomes unreasonably slow due to the GC overhead.

Things we tried include:

  • reducing the partition size (reduces the memory footprint but increases the bookkeeping overhead)
  • pre-partitioning the data with a HashPartitioner before doing the aggregation (reduces the memory consumption but requires a full reshuffle before any real work happens)

Are there better ways to achieve the desired effect?

like image 900
DanielM Avatar asked May 19 '15 09:05

DanielM


1 Answers

Generally speaking almost universal solution to the problems like this one is to keep partition size at the reasonable size. While "reasonable" is slightly subjective and can vary from case to case 100-200MB is looks like good place to start.

I can easily aggregate example data you've provided on a single worker keeping default spark.executor.memory (1GB) and limiting total available resources to 8 cores and 8GB RAM. All of that by using 50 partitions and keeping aggregation time around 3 seconds without any special tunning (this is more or less consistent between 1.5.2 to 2.0.0).

So to summarize: either increase spark.default.parallelism or explicitly set number of partitions when creating DataFrame if possible. Default spark.sql.shuffle.partitions should be enough for a small dataset like this one.

like image 95
zero323 Avatar answered Sep 21 '22 19:09

zero323