Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark and HBase Snapshots

Under the assumption that we could access data much faster if pulling directly from HDFS instead of using the HBase API, we're trying to build an RDD based on a Table Snapshot from HBase.

So, I have a snapshot called "dm_test_snap". I seem to be able to get most of the configuration stuff working, but my RDD is null (despite there being data in the Snapshot itself).

I'm having a hell of a time finding an example of anyone doing offline analysis of HBase snapshots with Spark, but I can't believe I'm alone in trying to get this working. Any help or suggestions are greatly appreciated.

Here is a snippet of my code:

object TestSnap  {
  def main(args: Array[String]) {
    val config = ConfigFactory.load()
    val hbaseRootDir =  config.getString("hbase.rootdir")
    val sparkConf = new SparkConf()
      .setAppName("testnsnap")
      .setMaster(config.getString("spark.app.master"))
      .setJars(SparkContext.jarOfObject(this))
      .set("spark.executor.memory", "2g")
      .set("spark.default.parallelism", "160")

    val sc = new SparkContext(sparkConf)

    println("Creating hbase configuration")
    val conf = HBaseConfiguration.create()

    conf.set("hbase.rootdir", hbaseRootDir)
    conf.set("hbase.zookeeper.quorum",  config.getString("hbase.zookeeper.quorum"))
    conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
    conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")

    val scan = new Scan
    val job = Job.getInstance(conf)

    TableSnapshotInputFormat.setInput(job, "dm_test_snap", 
        new Path("hdfs://nameservice1/tmp"))

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }

}

Update to include the solution The trick was, as @Holden mentioned below, the conf wasn't getting passed through. To remedy this, I was able to get it working by changing this the call to newAPIHadoopRDD to this:

val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

There was a second issue that was also highlighted by @victor's answer, that I was not passing in a scan. To fix that, I added this line and method:

conf.set(TableInputFormat.SCAN, convertScanToString(scan))

 def convertScanToString(scan : Scan) = {
      val proto = ProtobufUtil.toScan(scan);
      Base64.encodeBytes(proto.toByteArray());
    }

This also let me pull out this line from the conf.set commands:

conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")

*NOTE: This was for HBase version 0.96.1.1 on CDH5.0

Final full code for easy reference:

object TestSnap  {
  def main(args: Array[String]) {
    val config = ConfigFactory.load()
    val hbaseRootDir =  config.getString("hbase.rootdir")
    val sparkConf = new SparkConf()
      .setAppName("testnsnap")
      .setMaster(config.getString("spark.app.master"))
      .setJars(SparkContext.jarOfObject(this))
      .set("spark.executor.memory", "2g")
      .set("spark.default.parallelism", "160")

    val sc = new SparkContext(sparkConf)

    println("Creating hbase configuration")
    val conf = HBaseConfiguration.create()

    conf.set("hbase.rootdir", hbaseRootDir)
    conf.set("hbase.zookeeper.quorum",  config.getString("hbase.zookeeper.quorum"))
    conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
    val scan = new Scan
    conf.set(TableInputFormat.SCAN, convertScanToString(scan))

    val job = Job.getInstance(conf)

    TableSnapshotInputFormat.setInput(job, "dm_test_snap", 
        new Path("hdfs://nameservice1/tmp"))

    val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }

  def convertScanToString(scan : Scan) = {
      val proto = ProtobufUtil.toScan(scan);
      Base64.encodeBytes(proto.toByteArray());
  }

}
like image 595
dmcnelis Avatar asked Jun 10 '15 18:06

dmcnelis


People also ask

Does spark use HBase?

Spark HBase Connector(SHC) provides feature rich and efficient access to HBase through Spark SQL. It bridges the gap between the simple HBase key value store and complex relational SQL queries and enables users to perform complex data analytics on top of HBase using Spark.

What is snapshot in HBase?

HBase snapshot support enables you to take a snapshot of a table without much impact on RegionServers, because snapshot, clone, and restore operations do not involve data copying. In addition, exporting a snapshot to another cluster has no impact on RegionServers.

What is snapshot spark?

These reports are called SPARK Snapshots. They provide a window into the journeys of SPARK participants and contain interesting findings from the information that families have provided. As more families join SPARK and we explore new topics, we'll take a new Snapshot to share our discoveries.

How do I backup my HBase table?

You can group tables into a named backup set with the hbase backup set add command. You can then use the -set option to invoke the name of a backup set in the hbase backup create or hbase backup restore rather than list individually every table in the group. You can have multiple backup sets.


2 Answers

Looking at the Job information, its making a copy of the conf object you are supplying to it (The Job makes a copy of the Configuration so that any necessary internal modifications do not reflect on the incoming parameter.) so most likely the information that you need to set on the conf object isn't getting passed down to Spark. You could instead use TableSnapshotInputFormatImpl which has a similar method that works on conf objects. There might be additional things needed but at first pass through the problem this seems like the most likely cause.

As pointed out in the comments, another option is to use job.getConfiguration to get the updated config from the job object.

like image 70
Holden Avatar answered Oct 17 '22 17:10

Holden


You have not configured your M/R job properly: This is example in Java on how to configure M/R over snapshots:

Job job = new Job(conf);
Scan scan = new Scan();
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
       scan, MyTableMapper.class, MyMapKeyOutput.class,
       MyMapOutputValueWritable.class, job, true);
}

You, definitely, skipped Scan. I suggest you taking a look at TableMapReduceUtil's initTableSnapshotMapperJob implementation to get idea how to configure job in Spark/Scala.

like image 1
Vladimir Rodionov Avatar answered Oct 17 '22 18:10

Vladimir Rodionov