The below code will read from the hbase, then convert it to json structure and the convert to schemaRDD , But the problem is that I am using List
to store the json string then pass to javaRDD, for data of about 100 GB the master will be loaded with data in memory. What is the right way to load the data from hbase then perform manipulation,then convert to JavaRDD.
package hbase_reader; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.JavaSchemaRDD; import org.apache.commons.cli.ParseException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import scala.Function1; import scala.Tuple2; import scala.runtime.AbstractFunction1; import com.google.common.collect.Lists; public class hbase_reader { public static void main(String[] args) throws IOException, ParseException { List<String> jars = Lists.newArrayList(""); SparkConf spconf = new SparkConf(); spconf.setMaster("local[2]"); spconf.setAppName("HBase"); //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1"); spconf.setJars(jars.toArray(new String[jars.size()])); JavaSparkContext sc = new JavaSparkContext(spconf); //spconf.set("spark.executor.memory", "1g"); JavaSQLContext jsql = new JavaSQLContext(sc); HBaseConfiguration conf = new HBaseConfiguration(); String tableName = "HBase.CounData1_Raw_Min1"; HTable table = new HTable(conf,tableName); try { ResultScanner scanner = table.getScanner(new Scan()); List<String> jsonList = new ArrayList<String>(); String json = null; for(Result rowResult:scanner) { json = ""; String rowKey = Bytes.toString(rowResult.getRow()); for(byte[] s1:rowResult.getMap().keySet()) { String s1_str = Bytes.toString(s1); String jsonSame = ""; for(byte[] s2:rowResult.getMap().get(s1).keySet()) { String s2_str = Bytes.toString(s2); for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) { String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3)); jsonSame += "\""+s2_str+"\":"+s3_str+","; } } jsonSame = jsonSame.substring(0,jsonSame.length()-1); json += "\""+s1_str+"\""+":{"+jsonSame+"}"+","; } json = json.substring(0,json.length()-1); json = "{\"RowKey\":\""+rowKey+"\","+json+"}"; jsonList.add(json); } JavaRDD<String> jsonRDD = sc.parallelize(jsonList); JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD); System.out.println(schemaRDD.take(2)); } finally { table.close(); } } }
Below HBase libraries are required to connect Spark with the HBase database and perform read and write rows to the table. hbase-client This library provides by HBase which is used natively to interact with HBase.
You can use Spark-Hbase connector to access HBase from Spark.It provides an API in both low-level RDD and Dataframes . The connector requires you to define a Schema for HBase table. Below is an example of Schema defined for a HBase table with name as table1 , row key as key and a number of columns (col1-col8).
1 Answer. Spark Streaming framework helps in developing applications that can perform analytics on streaming, real-time data - such as analyzing video or social media data, in real-time. In fast-changing industries such as marketing, performing real-time analytics is very important.
To access the HBase shell, you have to navigate to the HBase home folder. You can start the HBase interactive shell using “hbase shell” command as shown below. If you have successfully installed HBase in your system, then it gives you the HBase shell prompt as shown below.
A Basic Example to Read the HBase data using Spark (Scala), You can also wrtie this in Java :
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark._ object HBaseRead { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val tableName = "table1" System.setProperty("user.name", "hdfs") System.setProperty("HADOOP_USER_NAME", "hdfs") conf.set("hbase.master", "localhost:60000") conf.setInt("timeout", 120000) conf.set("hbase.zookeeper.quorum", "localhost") conf.set("zookeeper.znode.parent", "/hbase-unsecure") conf.set(TableInputFormat.INPUT_TABLE, tableName) val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(tableName) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) println("Number of Records found : " + hBaseRDD.count()) sc.stop() } }
As of Spark 1.0.x+, Now you can use Spark-HBase Connector also :
Maven Dependency to Include :
<dependency> <groupId>it.nerdammer.bigdata</groupId> <artifactId>spark-hbase-connector_2.10</artifactId> <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x </dependency>
And find a below sample code for the same :
import org.apache.spark._ import it.nerdammer.spark.hbase._ object HBaseRead extends App { val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]") sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme val sc = new SparkContext(sparkConf) // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then: val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document") .select("DocID", "Title").inColumnFamily("SMPL") println("Number of Records found : " + docRdd .count()) }
As of Spark 1.6.x+, Now you can use SHC Connector also (Hortonworks or HDP users) :
Maven Dependency to Include :
<dependency> <groupId>com.hortonworks</groupId> <artifactId>shc</artifactId> <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x </dependency>
The Main advantage of using this connector is that it have flexibility in the Schema definition and doesn't need Hardcoded params just like in nerdammer/spark-hbase-connector. Also remember that it supports Spark 2.x so this connector is pretty much flexible and provides end-to-end support in Issues and PRs.
Find the below repository path for the latest readme and samples :
Hortonworks Spark HBase Connector
You can also convert this RDD's to DataFrames and run SQL over it or You can map these Dataset or DataFrames to user defined Java Pojo's or Case classes. It works brilliant.
Please comment below if you need anything else.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With