I have 2 spark RDDs, dataRDD and newPairDataRDD which are used for spark SQL query. when my application init, the dataRDD will be initialized. All data in one specified hbase entity will be stored to dataRDD.
When client's sql query comes, my APP will get all the new updates and inserts to newPairDataRDD. the dataRDD union the newPairDataRDD and register as table in spark SQL context.
I found even 0 record in dataRDD and 1 new inserted record in newPairDataRDD. It will takes 4 seconds for union. That's too slow
I think it is not reasonable. Anyone knows how to make it quicker? Thanks simple code as below
// Step1: load all data from hbase to dataRDD when initial, this only run once.
JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD();
dataRDD.cache();
dataRDD.persist(StorageLevel.MEMORY_ONLY());
logger.info(dataRDD.count());
// Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD
JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
// Step3: if count>0 do union and reduce
if(newPairDataRDD.count() > 0) {
JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);
// if data was updated in DB, need to delete the old version from the dataRDD.
dataRDD = unionedRDD.reduceByKey(
new Function2<Row, Row, Row>() {
// @Override
public Row call(Row r1, Row r2) {
return r2;
}
});
}
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);
//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
From the spark web ui, I can see below. Apparently it needs 4s for union
Completed Stages (8)
StageId Description Submitted Duration Tasks: Succeeded/Total Input Shuffle Read Shuffle Write
6 collect at SparkPlan.scala:85+details 1/4/2015 8:17 2 s 8-Aug 156.0 B
7 union at SparkSqlQueryForMarsNew.java:389+details 1/4/2015 8:17 4 s 8-Aug 64.0 B 156.0 B
A more efficient way to achieve what you want is to use a cogroup()
and a flatMapValues()
, using a union does very little except add new partitions to the dataRDD
, meaning all the data must be shuffled before the reduceByKey()
. A cogroup()
and flatMapValues()
will cause repartitioning of only the newPairDataRDD
.
JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD);
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() {
public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) {
if (grouped._2.nonEmpty()) {
return grouped._2;
} else {
return grouped._1;
}
}
});
Or in Scala
val unioned = dataRDD.cogroup(newPairDataRDD)
val updated = unioned.flatMapValues { case (oldVals, newVals) =>
if (newVals.nonEmpty) newVals else oldVals
}
Disclaimer, I'm not used to writing spark in Java! Please someone correct me if the above is wrong!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With