I have tried to compare the performance of Spark SQL version 1.6 and version 1.5. In a simple case, Spark 1.6 is quite faster than Spark 1.5. However, in a more complex query - in my case an aggregation query with grouping sets, Spark SQL version 1.6 is very much slower than Spark SQL version 1.5. Does anybody notice the same issue? and even better have a solution for this kind of query?
Here is my code
case class Toto(
a: String = f"${(math.random*1e6).toLong}%06.0f",
b: String = f"${(math.random*1e6).toLong}%06.0f",
c: String = f"${(math.random*1e6).toLong}%06.0f",
n: Int = (math.random*1e3).toInt,
m: Double = (math.random*1e3))
val data = sc.parallelize(1 to 1e6.toInt).map(i => Toto())
val df: org.apache.spark.sql.DataFrame = sqlContext.createDataFrame( data )
df.registerTempTable( "toto" )
val sqlSelect = "SELECT a, b, COUNT(1) AS k1, COUNT(DISTINCT n) AS k2, SUM(m) AS k3"
val sqlGroupBy = "FROM toto GROUP BY a, b GROUPING SETS ((a,b),(a),(b))"
val sqlText = s"$sqlSelect $sqlGroupBy"
val rs1 = sqlContext.sql( sqlText )
rs1.saveAsParquetFile( "rs1" )
Here are 2 screenshots Spark 1.5.2 and Spark 1.6.0 with --driver-memory=1G. The DAG on Spark 1.6.0 can be viewed at DAG.
Extrapolating the average I/O rate across the duration of the tests (Big SQL is 3.2x faster than Spark SQL), then Spark SQL actually reads almost 12x more data than Big SQL, and writes 30x more data.
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. It enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data.
The high-level query language and additional type information makes Spark SQL more efficient. The Spark SQL uses of in-memory columnar storage. The in-memory columnar is a feature that allows storing the data in a columnar format, rather than row format.
Thanks Herman van Hövell for his reply on spark dev community. In order to share with other members, I share his response here.
1.6 plans single distinct aggregates like multiple distinct aggregates; this inherently causes some overhead but is more stable in case of high cardinalities. You can revert to the old behavior by setting the spark.sql.specializeSingleDistinctAggPlanning option to false. See also: https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L452-L462
Actually in order to revert the setting value should be "true".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With