I'm evaluating Spark SQL to implement a simple reporting module (few simple aggregations over Avro data already stored on HDFS). I have no doubt that Spark SQL could a good fit for both my functional and non-functional requirements.
However, on top of production requirements I want to make sure the module will be testable. We follow a BDD approach with very focused scenarios which means that this module will require to run tens / hundreds of SQL queries over some very simple data (1..10 records).
To get a rough idea of the performance I can expect from Spark SQL in local mode, I have quickly prototyped a few tests:
select count(*) from myTable
select key, count(*) from myTable group by key
The first test takes 100ms on average, but the second one takes 500ms. Such performance is unacceptable this it would make the test suite too slow.
For comparison, I can run the same test in 10ms using Crunch and its MemPipeline (1500ms with MRPipeline in local mode) and also 1500ms with Hive in embedded mode. Spark SQL is thus a bit faster than MR in local mode, but still way to slow to build good test suites.
Is it possible to speed up Spark SQL in local mode ?
Is there a better / faster way to test a Spark SQL module ?
(I have not profiled the execution yet but since a groupBy().countByKey()
on a RDD takes 40ms on average I expect to find that the culprit is the query optimizer)
My quick & dirty test code follows:
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName("poc-sparksql");
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
SQLContext sqlCtx = new SQLContext(ctx);
for (int i = 0; i < ITERATIONS; i++) {
Stopwatch testCaseSw = new Stopwatch().start();
DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
df.registerTempTable("myTable");
DataFrame result = sqlCtx.sql("select count(*) from myTable");
System.out.println("Results: " + result.collectAsList());
System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
}
for (int i = 0; i < ITERATIONS; i++) {
Stopwatch testCaseSw = new Stopwatch().start();
DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
df.registerTempTable("myTable");
DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a ");
System.out.println("Results: " + result.collectAsList());
System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
}
}
Spark SQL is a highly scalable and efficient relational processing engine with ease-to-use APIs and mid-query fault tolerance. It is a core module of Apache Spark.
Launching too many tasks isn't a good option when the data size is really small.
In your second option group by
will create another stage
with 200 tasks
because you didn't set shuffle partitions property and, by default it is 200
and most of them would be empty.
It might not make a difference in a single test but can have significant impact when you have thousands of tests with shuffle operations.
Set "spark.sql.shuffle.partitions"
to x (where x is local[x]
) in spark conf .
Actually you don't need 4 executors
to handle less than 10 records so better reduce the number of executors to 1
and also set shuffle.paritions
to 1
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With