I am working on Kafka Spark streaming project. Spark streaming getting data from Kafka. Data is in json format. sample input
{ "table": "tableA", "Product_ID": "AGSVGF.upf", "file_timestamp": "2018-07-26T18:58:08.4485558Z000000000000000", "hdfs_file_name": "null_1532631600050", "Date_Time": "2018-07-26T13:45:01.0000000Z", "User_Name": "UBAHTSD" }
{ "table": "tableB", "Test_ID": "FAGS.upf", "timestamp": "2018-07-26T18:58:08.4485558Z000000000000000", "name": "flink", "time": "2018-07-26T13:45:01.0000000Z", "Id": "UBAHTGADSGSCVDGHASD" }
One JSON string is one message. There are 15 types of JSON string which distinguish using table column. Now I want to save this 15 different JSON in Apache Hive. So I have created a dstream and on the bases of table column I have filtered the rdd and saved into Hive. Code works fine. But some time lots it table much time then spark batch. I have controlled the input using spark.streaming.kafka.maxRatePerPartition=10
. I have repartitioned the rdd into 9 partitioned but on Spark UI, it show unknown stage.
Here is my code.
val dStream = dataStream.transform(rdd => rdd.repartition(9)).map(_._2)
dStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val sparkContext = rdd.sparkContext
rdd.persist(StorageLevel.MEMORY_AND_DISK)
val hiveContext = getInstance(sparkContext)
val tableA = rdd.filter(_.contains("tableA"))
if (!tableA.isEmpty()) {
HiveUtil.tableA(hiveContext.read.json(tableA))
tableA.unpersist(true)
}
val tableB = rdd.filter(_.contains("tableB"))
if (!tableB.isEmpty()) {
HiveUtil.tableB(hiveContext.read.json(tableB))
tableB.unpersist(true)
}
.....
.... upto 15 tables
....
val tableK = rdd.filter(_.contains("tableK"))
if (!tableB.isEmpty()) {
HiveUtil.tableB(hiveContext.read.json(tableK))
tableB.unpersist(true)
}
}
}
How I can optimise the code ?
Thank you.
Purely from a management perspective, I would suggest you parameterize your job to accept the table name, then run 15 separate Spark applications. Also ensure that the kafka consumer group is different for each application
This way, you can more easily monitor which Spark job is not performing as well as others and a skew of data to one table won't cause issues with others.
It's not clear what the Kafka message keys are, but if produced with the table as the key, then Spark could scale along with the kafka partitions, and you're guaranteed all messages for each table will be in order.
Overall, I would actually use Kafka Connect or Streamsets for writing to HDFS/Hive, not having to write code or tune Spark settings
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With