Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL performance

My code's algorithm as below
Step1. get one hbase entity data to hBaseRDD

      JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = 
                 jsc.newAPIHadoopRDD(hbase_conf,  TableInputFormat.class,
                 ImmutableBytesWritable.class, Result.class); 

Step2. transform hBaseRDD to rowPairRDD

     // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data 
     JavaPairRDD<String, Row> rowPairRDD = hBaseRDD 
                            .mapToPair(***); 
    dataRDD.repartition(500);
        dataRDD.cache();

Step3. transform rowPairRDD to schemaRDD

            JavaSchemaRDD schemaRDD =   sqlContext.applySchema(rowPairRDD.values(), schema); 
            schemaRDD.registerTempTable("testentity"); 
           sqlContext.sqlContext().cacheTable("testentity");

Step4. use spark sql do the first simple sql query.

   JavaSQLContext  sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
    JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE 
             column3 = 'value1' ") 
     List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Step5. use spark sql do the second simple sql query.

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity 
                                     WHERE column3 = 'value2' ") 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Step6. use spark sql do the Third simple sql query.

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' "); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Test result as below:

Test case1:

When I insert 300,000 records, the hbase entity, then run the code.

  • the 1st query need 60407 ms
  • the 2nd query need 838 ms
  • the 3td query need 792 ms

If I use hbase Api to do the similar query, it only takes 2000 ms. Apparently the last 2 spark sql query are much quicker than the hbase api query.
I believe the 1st spark sql query spends a lot of time to load data from hbase.
So 1st query much slower than the last 2 querys. I think the result is expected

Test case2:

When I insert 400,000 records. the hbase entity, then run the code.

  • the 1st query need 87213 ms
  • the 2nd query need 83238 ms
  • the 3td query need 82092 ms

If I use hbase Api to do the similar query, it only takes 3500 ms. Apparently the 3 spark sql querys are much slower than the hbase api query.
And the last 2 spark sql querys are also very slow and the performance similar to the first query, Why? How can I tune the performance?

like image 721
simafengyun Avatar asked Dec 25 '14 10:12

simafengyun


People also ask

Is Spark SQL faster?

Faster Execution - Spark SQL is faster than Hive. For example, if it takes 5 minutes to execute a query in Hive then in Spark SQL it will take less than half a minute to execute the same query.

Is Spark SQL slow?

Before optimization, pure Spark SQL actually has decent performance. Still, there are some slow processes that can be sped up, including: Shuffle. partitions.

Is Spark SQL more efficient?

As the datasets become larger, processing big data becomes difficult for other frameworks. But, it can be processed much faster in Spark SQL because it uses all cores for the cluster nodes to process the queries over a large dataset.

Is Spark SQL faster than PySpark?

As can be seen in the tables, when reading files, PySpark is slightly faster than Apache Spark. However, for the processing of the file data, Apache Spark is significantly faster, with 8.53 seconds against 11.7, a 27% difference.


1 Answers

I suspect you are trying to cache more data than you have allocated to your Spark instance. I'll try to break down what is going on in each execution of the exact same query.

First of all, everything in Spark is lazy. This means that when you call rdd.cache(), nothing actually happens until you do something with the RDD.

First Query

  1. Full HBase scan (slow)
  2. Increase number of partitions (causes shuffle, slow)
  3. Data is actually cached to memory because Spark is lazy (kind of slow)
  4. Apply where predicate (fast)
  5. Results are collected

Second/Third Query

  1. Full in-memory scan (fast)
  2. Apply where predicate (fast)
  3. Results are collected

Now, Spark will try to cache as much of an RDD as possible. If it can't cache the entire thing, you may run into some serious slow downs. This is especially true if one of the steps before caching causes a shuffle. You may be repeating steps 1 - 3 in the first query for each subsequent query. That's not ideal.

To see if you are not fully caching an RDD, go to your Spark Web UI (http://localhost:4040 if in local standalone mode) and look for the RDD storage/persistence information. Make sure it is at 100%.

Edit (per comments):

400,000 data size in my hbase only about 250MB. Why I need to use 2G to fixed the issue(but 1G>>250MB)

I can't say for certain why you hit your max limit with spark.executor.memory=1G, but I will add some more relevant information about caching.

  • Spark only allocates a percentage of the executor's heap memory to caching. By default, this is spark.storage.memoryFraction=0.6 or 60%. So you are really only getting 1GB * 0.6.
  • The total space used in HBase likely differs from the total heap space taken when caching in Spark. By default, Spark does not serialize the Java objects when storing in memory. Because of this, there is a decent amount of overhead in storing the Java Object metadata. You can change the default persistence level.

Do you know how to cache all the data to avoid the bad performance for the first query?

Invoking any action will cause the RDD to be cached. Just do this

scala> rdd.cache
scala> rdd.count

Now it's cached.

like image 140
Mike Park Avatar answered Sep 23 '22 02:09

Mike Park