Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Split single DStream into multiple Hive tables

I am working on Kafka Spark streaming project. Spark streaming getting data from Kafka. Data is in json format. sample input

{ "table": "tableA", "Product_ID": "AGSVGF.upf", "file_timestamp": "2018-07-26T18:58:08.4485558Z000000000000000", "hdfs_file_name": "null_1532631600050", "Date_Time": "2018-07-26T13:45:01.0000000Z", "User_Name": "UBAHTSD" }

{ "table": "tableB", "Test_ID": "FAGS.upf", "timestamp": "2018-07-26T18:58:08.4485558Z000000000000000", "name": "flink", "time": "2018-07-26T13:45:01.0000000Z", "Id": "UBAHTGADSGSCVDGHASD" }

One JSON string is one message. There are 15 types of JSON string which distinguish using table column. Now I want to save this 15 different JSON in Apache Hive. So I have created a dstream and on the bases of table column I have filtered the rdd and saved into Hive. Code works fine. But some time lots it table much time then spark batch. I have controlled the input using spark.streaming.kafka.maxRatePerPartition=10. I have repartitioned the rdd into 9 partitioned but on Spark UI, it show unknown stage. enter image description here

Here is my code.

val dStream = dataStream.transform(rdd => rdd.repartition(9)).map(_._2)
dStream.foreachRDD { rdd =>
    if (!rdd.isEmpty()) {
      val sparkContext = rdd.sparkContext
      rdd.persist(StorageLevel.MEMORY_AND_DISK)
      val hiveContext = getInstance(sparkContext)
          val tableA = rdd.filter(_.contains("tableA"))
          if (!tableA.isEmpty()) {
            HiveUtil.tableA(hiveContext.read.json(tableA))
            tableA.unpersist(true)
          }

          val tableB = rdd.filter(_.contains("tableB"))
          if (!tableB.isEmpty()) {
            HiveUtil.tableB(hiveContext.read.json(tableB))
            tableB.unpersist(true)
          }
          .....
          .... upto 15 tables
          ....

            val tableK = rdd.filter(_.contains("tableK"))
              if (!tableB.isEmpty()) {
                HiveUtil.tableB(hiveContext.read.json(tableK))
                tableB.unpersist(true)
              }

    }

}

How I can optimise the code ?

Thank you.

like image 774
lucy Avatar asked Jul 29 '18 09:07

lucy


1 Answers

Purely from a management perspective, I would suggest you parameterize your job to accept the table name, then run 15 separate Spark applications. Also ensure that the kafka consumer group is different for each application

This way, you can more easily monitor which Spark job is not performing as well as others and a skew of data to one table won't cause issues with others.

It's not clear what the Kafka message keys are, but if produced with the table as the key, then Spark could scale along with the kafka partitions, and you're guaranteed all messages for each table will be in order.

Overall, I would actually use Kafka Connect or Streamsets for writing to HDFS/Hive, not having to write code or tune Spark settings

like image 154
OneCricketeer Avatar answered Sep 28 '22 09:09

OneCricketeer