Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming with Hbase

i am trying to get data from hbase ,For all the tuto I find that to have the data of Hbase I am obliged to go through Kafka, is it possible an integration between spark streaming and hbase directly without including Kafka in the chain Thanks .

like image 861
Sendi Zied Avatar asked Oct 18 '22 19:10

Sendi Zied


1 Answers

is it possible an integration between spark streaming and hbase directly without including Kafka

Yes.. its possible as we have done the same without using kafka. see the below example JavaHBaseStreamingBulkPutExample

package org.apache.hadoop.hbase.spark.example.hbasecontext;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * This is a simple example of BulkPut with Spark Streaming
 */
final public class JavaHBaseStreamingBulkPutExample {

  private JavaHBaseStreamingBulkPutExample() {}

  public static void main(String[] args) {
    if (args.length < 4) {
      System.out.println("JavaHBaseBulkPutExample  " +
              "{host} {port} {tableName}");
      return;
    }

    String host = args[0];
    String port = args[1];
    String tableName = args[2];

    SparkConf sparkConf =
            new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
                    tableName + ":" + port + ":" + tableName);

    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    try {
      JavaStreamingContext jssc =
              new JavaStreamingContext(jsc, new Duration(1000));

      JavaReceiverInputDStream<String> javaDstream =
              jssc.socketTextStream(host, Integer.parseInt(port));

      Configuration conf = HBaseConfiguration.create();

      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

      hbaseContext.streamBulkPut(javaDstream,
              TableName.valueOf(tableName),
              new PutFunction());
    } finally {
      jsc.stop();
    }
  }

  public static class PutFunction implements Function<String, Put> {

    private static final long serialVersionUID = 1L;

    public Put call(String v) throws Exception {
      String[] part = v.split(",");
      Put put = new Put(Bytes.toBytes(part[0]));

      put.addColumn(Bytes.toBytes(part[1]),
              Bytes.toBytes(part[2]),
              Bytes.toBytes(part[3]));
      return put;
    }

  }
}
like image 56
Ram Ghadiyaram Avatar answered Oct 21 '22 08:10

Ram Ghadiyaram