Consider the following example of running a GROUP BY
with a relatively large number of aggregations and a relatively large number of groups:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._
val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt
case class Data(A: Long = (math.random*num_groups).toLong)
val table = (1 to num_rows).map(i => Data()).toDF
val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")
// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")
The input of this job is just 8MB, the output around 2.4GB and I am running this on a cluster with three worker machines with 61GB memory each. The result: All workers crash with OutOfMemory exceptions.
Even with lower values for num_columns
the job becomes unreasonably slow due to the GC overhead.
Things we tried include:
Are there better ways to achieve the desired effect?
Generally speaking almost universal solution to the problems like this one is to keep partition size at the reasonable size. While "reasonable" is slightly subjective and can vary from case to case 100-200MB is looks like good place to start.
I can easily aggregate example data you've provided on a single worker keeping default spark.executor.memory
(1GB) and limiting total available resources to 8 cores and 8GB RAM. All of that by using 50 partitions and keeping aggregation time around 3 seconds without any special tunning (this is more or less consistent between 1.5.2 to 2.0.0).
So to summarize: either increase spark.default.parallelism
or explicitly set number of partitions when creating DataFrame
if possible. Default spark.sql.shuffle.partitions
should be enough for a small dataset like this one.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With