Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to speed up Spark SQL unit tests?

I'm evaluating Spark SQL to implement a simple reporting module (few simple aggregations over Avro data already stored on HDFS). I have no doubt that Spark SQL could a good fit for both my functional and non-functional requirements.

However, on top of production requirements I want to make sure the module will be testable. We follow a BDD approach with very focused scenarios which means that this module will require to run tens / hundreds of SQL queries over some very simple data (1..10 records).

To get a rough idea of the performance I can expect from Spark SQL in local mode, I have quickly prototyped a few tests:

  1. select count(*) from myTable
  2. select key, count(*) from myTable group by key

The first test takes 100ms on average, but the second one takes 500ms. Such performance is unacceptable this it would make the test suite too slow.

For comparison, I can run the same test in 10ms using Crunch and its MemPipeline (1500ms with MRPipeline in local mode) and also 1500ms with Hive in embedded mode. Spark SQL is thus a bit faster than MR in local mode, but still way to slow to build good test suites.

Is it possible to speed up Spark SQL in local mode ?

Is there a better / faster way to test a Spark SQL module ?

(I have not profiled the execution yet but since a groupBy().countByKey() on a RDD takes 40ms on average I expect to find that the culprit is the query optimizer)


My quick & dirty test code follows:

  SparkConf sparkConf = new SparkConf()
                .setMaster("local[4]")
                .setAppName("poc-sparksql");

  try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
        SQLContext sqlCtx = new SQLContext(ctx);

        for (int i = 0; i < ITERATIONS; i++) {
            Stopwatch testCaseSw = new Stopwatch().start();

            DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
            df.registerTempTable("myTable");
            DataFrame result = sqlCtx.sql("select count(*) from myTable");

            System.out.println("Results: " + result.collectAsList());
            System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
        }

        for (int i = 0; i < ITERATIONS; i++) {
            Stopwatch testCaseSw = new Stopwatch().start();

            DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
            df.registerTempTable("myTable");
            DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a ");

            System.out.println("Results: " + result.collectAsList());
            System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
        }
 }
like image 789
Clément MATHIEU Avatar asked Nov 29 '15 14:11

Clément MATHIEU


People also ask

Is spark SQL efficient?

Spark SQL is a highly scalable and efficient relational processing engine with ease-to-use APIs and mid-query fault tolerance. It is a core module of Apache Spark.


1 Answers

Launching too many tasks isn't a good option when the data size is really small.
In your second option group by will create another stage with 200 tasks because you didn't set shuffle partitions property and, by default it is 200 and most of them would be empty.

It might not make a difference in a single test but can have significant impact when you have thousands of tests with shuffle operations.

Set "spark.sql.shuffle.partitions" to x (where x is local[x]) in spark conf .

Actually you don't need 4 executors to handle less than 10 records so better reduce the number of executors to 1 and also set shuffle.paritions to 1.

like image 179
Belwal Avatar answered Sep 19 '22 13:09

Belwal