We are doing streaming on kafka data which being collected from MySQL. Now once all the analytics has been done i want to save my data directly to Hbase. I have through the spark structured streaming document but couldn't find any sink with Hbase. Code which I used to read the data from Kafka is below.
val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
val uschema = StructType(Seq(
StructField("MeterNumber", StringType, true),
StructField("Utility", StringType, true),
StructField("VendorServiceNumber", StringType, true),
StructField("VendorName", StringType, true),
StructField("SiteNumber", StringType, true),
StructField("SiteName", StringType, true),
StructField("Location", StringType, true),
StructField("timestamp", LongType, true),
StructField("power", DoubleType, true)
))
val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")
Now finally, I want to save DF_Hbase dataframe in hbase.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2.
Overview. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
1- add these libraries to your project :
"org.apache.hbase" % "hbase-client" % "2.0.1"
"org.apache.hbase" % "hbase-common" % "2.0.1"
2- add this trait to your code :
import java.util.concurrent.ExecutorService
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.security.User
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.ForeachWriter
trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
val tableName: String
val hbaseConfResources: Seq[String]
def pool: Option[ExecutorService] = None
def user: Option[User] = None
private var hTable: Table = _
private var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
true
}
def createConnection(): Connection = {
val hbaseConfig = HBaseConfiguration.create()
hbaseConfResources.foreach(hbaseConfig.addResource)
ConnectionFactory.createConnection(hbaseConfig, pool.orNull, user.orNull)
}
def getHTable(connection: Connection): Table = {
connection.getTable(TableName.valueOf(tableName))
}
override def process(record: RECORD): Unit = {
val put = toPut(record)
hTable.put(put)
}
override def close(errorOrNull: Throwable): Unit = {
hTable.close()
connection.close()
}
def toPut(record: RECORD): Put
}
3- use it for your logic :
val ds = .... //anyDataset[WhatEverYourDataType]
val query = ds.writeStream
.foreach(new HBaseForeachWriter[WhatEverYourDataType] {
override val tableName: String = "hbase-table-name"
//your cluster files, i assume here it is in resources
override val hbaseConfResources: Seq[String] = Seq("core-site.xml", "hbase-site.xml")
override def toPut(record: WhatEverYourDataType): Put = {
val key = .....
val columnFamaliyName : String = ....
val columnName : String = ....
val columnValue = ....
val p = new Put(Bytes.toBytes(key))
//Add columns ...
p.addColumn(Bytes.toBytes(columnFamaliyName),
Bytes.toBytes(columnName),
Bytes.toBytes(columnValue))
p
}
}
).start()
query.awaitTermination()
This method worked for me even using pyspark: https://github.com/hortonworks-spark/shc/issues/205
package HBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._
class HBaseSink(options: Map[String, String]) extends Sink with Logging {
// String with HBaseTableCatalog.tableCatalog
private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
df.write
.options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase").save()
}
}
class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new HBaseSink(parameters)
}
def shortName(): String = "hbase"
}
I added the file named as HBaseSinkProvider.scala to shc/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase
and built it, the example works perfect
This is example, how to use (scala):
inputDF.
writeStream.
queryName("hbase writer").
format("HBase.HBaseSinkProvider").
option("checkpointLocation", checkPointProdPath).
option("hbasecat", catalog).
outputMode(OutputMode.Update()).
trigger(Trigger.ProcessingTime(30.seconds)).
start
And an example of how i use it in python:
inputDF \
.writeStream \
.outputMode("append") \
.format('HBase.HBaseSinkProvider') \
.option('hbasecat', catalog_kafka) \
.option("checkpointLocation", '/tmp/checkpoint') \
.start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With