Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming with Hbase integration

We are doing streaming on kafka data which being collected from MySQL. Now once all the analytics has been done i want to save my data directly to Hbase. I have through the spark structured streaming document but couldn't find any sink with Hbase. Code which I used to read the data from Kafka is below.

 val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
 val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
 val uschema = StructType(Seq(
             StructField("MeterNumber", StringType, true),
             StructField("Utility", StringType, true),
             StructField("VendorServiceNumber", StringType, true),
             StructField("VendorName", StringType, true),
             StructField("SiteNumber",  StringType, true),
             StructField("SiteName", StringType, true),
             StructField("Location", StringType, true),
             StructField("timestamp", LongType, true),
             StructField("power", DoubleType, true)
             ))
 val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")

Now finally, I want to save DF_Hbase dataframe in hbase.

like image 526
HARENDRA SINGH Avatar asked Nov 07 '17 07:11

HARENDRA SINGH


People also ask

What is the difference between Spark Streaming and structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

Is Spark Streaming deprecated?

Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2.

Can Apache spark handle stream processing?

Overview. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.


2 Answers

1- add these libraries to your project :

      "org.apache.hbase" % "hbase-client" % "2.0.1"
      "org.apache.hbase" % "hbase-common" % "2.0.1"

2- add this trait to your code :

   import java.util.concurrent.ExecutorService
   import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
   import org.apache.hadoop.hbase.security.User
   import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
   import org.apache.spark.sql.ForeachWriter

   trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {

     val tableName: String
     val hbaseConfResources: Seq[String]

     def pool: Option[ExecutorService] = None

     def user: Option[User] = None

     private var hTable: Table = _
     private var connection: Connection = _


     override def open(partitionId: Long, version: Long): Boolean = {
       connection = createConnection()
       hTable = getHTable(connection)
       true
     }

     def createConnection(): Connection = {
       val hbaseConfig = HBaseConfiguration.create()
       hbaseConfResources.foreach(hbaseConfig.addResource)
       ConnectionFactory.createConnection(hbaseConfig, pool.orNull,                      user.orNull)

     }

     def getHTable(connection: Connection): Table = {
       connection.getTable(TableName.valueOf(tableName))
     }

     override def process(record: RECORD): Unit = {
       val put = toPut(record)
       hTable.put(put)
     }

     override def close(errorOrNull: Throwable): Unit = {
       hTable.close()
       connection.close()
     }

     def toPut(record: RECORD): Put

   }

3- use it for your logic :

    val ds = .... //anyDataset[WhatEverYourDataType]

    val query = ds.writeStream
           .foreach(new HBaseForeachWriter[WhatEverYourDataType] {
                            override val tableName: String = "hbase-table-name"
                            //your cluster files, i assume here it is in resources  
                            override val hbaseConfResources: Seq[String] = Seq("core-site.xml", "hbase-site.xml") 

                            override def toPut(record: WhatEverYourDataType): Put = {
                              val key = .....
                              val columnFamaliyName : String = ....
                              val columnName : String = ....
                              val columnValue = ....

                              val p = new Put(Bytes.toBytes(key))
                              //Add columns ... 
                   p.addColumn(Bytes.toBytes(columnFamaliyName),
                               Bytes.toBytes(columnName), 
                               Bytes.toBytes(columnValue))

                              p
                            }

                          }
           ).start()

         query.awaitTermination()
like image 187
Mohamed Abdo Avatar answered Sep 18 '22 22:09

Mohamed Abdo


This method worked for me even using pyspark: https://github.com/hortonworks-spark/shc/issues/205

package HBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging {
  // String with HBaseTableCatalog.tableCatalog
  private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {   
    val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
    df.write
      .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
        HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.spark.sql.execution.datasources.hbase").save()
  }
}

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): Sink = {
    new HBaseSink(parameters)
  }

  def shortName(): String = "hbase"
}

I added the file named as HBaseSinkProvider.scala to shc/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase and built it, the example works perfect

This is example, how to use (scala):

inputDF.
   writeStream.
   queryName("hbase writer").
   format("HBase.HBaseSinkProvider").
   option("checkpointLocation", checkPointProdPath).
   option("hbasecat", catalog).
   outputMode(OutputMode.Update()).
   trigger(Trigger.ProcessingTime(30.seconds)).
   start

And an example of how i use it in python:

inputDF \
    .writeStream \
    .outputMode("append") \
    .format('HBase.HBaseSinkProvider') \
    .option('hbasecat', catalog_kafka) \
    .option("checkpointLocation", '/tmp/checkpoint') \
    .start()
like image 36
ser0t0nin Avatar answered Sep 19 '22 22:09

ser0t0nin