Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

it is very slow for spark RDD union

I have 2 spark RDDs, dataRDD and newPairDataRDD which are used for spark SQL query. when my application init, the dataRDD will be initialized. All data in one specified hbase entity will be stored to dataRDD.

When client's sql query comes, my APP will get all the new updates and inserts to newPairDataRDD. the dataRDD union the newPairDataRDD and register as table in spark SQL context.

I found even 0 record in dataRDD and 1 new inserted record in newPairDataRDD. It will takes 4 seconds for union. That's too slow

I think it is not reasonable. Anyone knows how to make it quicker? Thanks simple code as below

    // Step1: load all data from hbase to dataRDD when initial, this only run once. 
    JavaPairRDD<String, Row>  dataRDD= getAllBaseDataToJavaRDD();
    dataRDD.cache();
    dataRDD.persist(StorageLevel.MEMORY_ONLY());
    logger.info(dataRDD.count());

    // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD

    JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
    // Step3: if count>0 do union and reduce

       if(newPairDataRDD.count() > 0) {

        JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);

    // if data was updated in DB, need to delete the old version from the dataRDD.

        dataRDD = unionedRDD.reduceByKey(
            new Function2<Row, Row, Row>() {
            // @Override
            public Row call(Row r1, Row r2) {
             return r2;
             }
            });
    }
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);

//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

From the spark web ui, I can see below. Apparently it needs 4s for union

Completed Stages (8)

StageId Description Submitted Duration Tasks: Succeeded/Total Input Shuffle Read Shuffle Write

6 collect at SparkPlan.scala:85+details 1/4/2015 8:17 2 s 8-Aug 156.0 B

7 union at SparkSqlQueryForMarsNew.java:389+details 1/4/2015 8:17 4 s 8-Aug 64.0 B 156.0 B

like image 291
simafengyun Avatar asked Jan 05 '15 01:01

simafengyun


1 Answers

A more efficient way to achieve what you want is to use a cogroup() and a flatMapValues(), using a union does very little except add new partitions to the dataRDD, meaning all the data must be shuffled before the reduceByKey(). A cogroup() and flatMapValues() will cause repartitioning of only the newPairDataRDD.

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD);
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
    new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() {
        public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) {
            if (grouped._2.nonEmpty()) {
                return grouped._2;
            } else {
                return grouped._1;
            }
        }
    });

Or in Scala

val unioned = dataRDD.cogroup(newPairDataRDD)
val updated = unioned.flatMapValues { case (oldVals, newVals) =>
    if (newVals.nonEmpty) newVals else oldVals
}

Disclaimer, I'm not used to writing spark in Java! Please someone correct me if the above is wrong!

like image 109
Jem Tucker Avatar answered Oct 04 '22 20:10

Jem Tucker