My code's algorithm as below
Step1. get one hbase entity data to hBaseRDD
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
Step2. transform hBaseRDD to rowPairRDD
// in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data
JavaPairRDD<String, Row> rowPairRDD = hBaseRDD
.mapToPair(***);
dataRDD.repartition(500);
dataRDD.cache();
Step3. transform rowPairRDD to schemaRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rowPairRDD.values(), schema);
schemaRDD.registerTempTable("testentity");
sqlContext.sqlContext().cacheTable("testentity");
Step4. use spark sql do the first simple sql query.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE
column3 = 'value1' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Step5. use spark sql do the second simple sql query.
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity
WHERE column3 = 'value2' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Step6. use spark sql do the Third simple sql query.
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' ");
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Test result as below:
Test case1:
When I insert 300,000 records, the hbase entity, then run the code.
If I use hbase Api to do the similar query, it only takes 2000 ms. Apparently the last 2 spark sql query are much quicker than the hbase api query.
I believe the 1st spark sql query spends a lot of time to load data from hbase.
So 1st query much slower than the last 2 querys. I think the result is expected
Test case2:
When I insert 400,000 records. the hbase entity, then run the code.
If I use hbase Api to do the similar query, it only takes 3500 ms. Apparently the 3 spark sql querys are much slower than the hbase api query.
And the last 2 spark sql querys are also very slow and the performance similar to the first query, Why? How can I tune the performance?
Faster Execution - Spark SQL is faster than Hive. For example, if it takes 5 minutes to execute a query in Hive then in Spark SQL it will take less than half a minute to execute the same query.
Before optimization, pure Spark SQL actually has decent performance. Still, there are some slow processes that can be sped up, including: Shuffle. partitions.
As the datasets become larger, processing big data becomes difficult for other frameworks. But, it can be processed much faster in Spark SQL because it uses all cores for the cluster nodes to process the queries over a large dataset.
As can be seen in the tables, when reading files, PySpark is slightly faster than Apache Spark. However, for the processing of the file data, Apache Spark is significantly faster, with 8.53 seconds against 11.7, a 27% difference.
I suspect you are trying to cache more data than you have allocated to your Spark instance. I'll try to break down what is going on in each execution of the exact same query.
First of all, everything in Spark is lazy. This means that when you call rdd.cache()
, nothing actually happens until you do something with the RDD.
First Query
Second/Third Query
Now, Spark will try to cache as much of an RDD as possible. If it can't cache the entire thing, you may run into some serious slow downs. This is especially true if one of the steps before caching causes a shuffle. You may be repeating steps 1 - 3 in the first query for each subsequent query. That's not ideal.
To see if you are not fully caching an RDD, go to your Spark Web UI (http://localhost:4040
if in local standalone mode) and look for the RDD storage/persistence information. Make sure it is at 100%.
Edit (per comments):
400,000 data size in my hbase only about 250MB. Why I need to use 2G to fixed the issue(but 1G>>250MB)
I can't say for certain why you hit your max limit with spark.executor.memory=1G
, but I will add some more relevant information about caching.
spark.storage.memoryFraction=0.6
or 60%. So you are really only getting 1GB * 0.6
.Object
metadata. You can change the default persistence level.Do you know how to cache all the data to avoid the bad performance for the first query?
Invoking any action will cause the RDD to be cached. Just do this
scala> rdd.cache
scala> rdd.count
Now it's cached.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With