Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to fetch all of data from hbase table in spark

I have a big table in hbase that name is UserAction, and it has three column families(song,album,singer). I need to fetch all of data from 'song' column family as a JavaRDD object. I try this code, but it's not efficient. Is there a better solution to do this ?

    static SparkConf sparkConf = new SparkConf().setAppName("test").setMaster(
        "local[4]");
static JavaSparkContext jsc = new JavaSparkContext(sparkConf);

static void getRatings() {

    Configuration conf = HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLE, "UserAction");
    conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "song");

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc
            .newAPIHadoopRDD(
                    conf,
                    TableInputFormat.class,
                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
                    org.apache.hadoop.hbase.client.Result.class);

    JavaRDD<Rating> count = hBaseRDD
            .map(new Function<Tuple2<ImmutableBytesWritable, Result>, JavaRDD<Rating>>() {

                @Override
                public JavaRDD<Rating> call(
                        Tuple2<ImmutableBytesWritable, Result> t)
                        throws Exception {
                    Result r = t._2;
                    int user = Integer.parseInt(Bytes.toString(r.getRow()));
                    ArrayList<Rating> ra = new ArrayList<>();

                    for (Cell c : r.rawCells()) {

                        int product = Integer.parseInt(Bytes
                                .toString(CellUtil.cloneQualifier(c)));
                        double rating = Double.parseDouble(Bytes
                                .toString(CellUtil.cloneValue(c)));

                        ra.add(new Rating(user, product, rating));
                    }

                    return jsc.parallelize(ra);
                }
            })
            .reduce(new Function2<JavaRDD<Rating>, JavaRDD<Rating>, JavaRDD<Rating>>() {
                @Override
                public JavaRDD<Rating> call(JavaRDD<Rating> r1,
                        JavaRDD<Rating> r2) throws Exception {
                    return r1.union(r2);
                }
            });
    jsc.stop();
}

Song column family scheme design is :

RowKey = userID, columnQualifier = songID and value = rating.
like image 722
Fatih Yakut Avatar asked Jul 02 '14 14:07

Fatih Yakut


People also ask

Can Spark read from HBase?

With SHC, Spark can execute batch jobs to read/write data from/into Phoenix tables. Phoenix can also read/write data from/into HBase tables created by SHC.


1 Answers

UPDATE: OK I see your problem now, for some crazy reason your turning your arrays into RDDs return jsc.parallelize(ra);. Why are you doing that?? Why are you creating an RDD of RDDs?? Why not leave them as arrays? When you do the reduce you can then concatenate the arrays. An RDD is a Resistant Distributed Dataset - it does not make logical sense to have a Distributed Dataset of Distributed Datasets. I'm surprised your job even runs and doesn't crash! Anyway that's why your job is so slow.

Anyway, in Scala after your map, you would just do a flatMap(identity) and that would concatenate all your lists together.

I don't really understand why you need to do a reduce, maybe that is where you have something inefficient going on. Here is my code to read HBase tables (its generalized - i.e. works for any scheme). One thing to be sure of is to make sure that when you read the HBase table you ensure the number of partitions is suitable (usually you want a lot).

type HBaseRow = java.util.NavigableMap[Array[Byte],
  java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]
// Map(CF -> Map(column qualifier -> Map(timestamp -> value)))
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def readTableAll(table: String): RDD[(Array[Byte], CFTimeseriesRow)] = {
  val conf = HBaseConfiguration.create()
  conf.set(TableInputFormat.INPUT_TABLE, table)
  sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
  .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
}

As you can see, I have no need for a reduce in my code. The methods are pretty self explainatory. I could dig further into your code, but I lack the patience to read Java as it's so epically verbose.

I have some more code specifically for fetching the most recent elements from the row (rather than the entire history). Let me know if you want to see that.

Finally, recommend you look into using Cassandra over HBase as datastax is partnering with databricks.

like image 155
samthebest Avatar answered Nov 15 '22 01:11

samthebest