Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exception while accessing KafkaOffset from RDD

I have a Spark consumer which streams from Kafka. I am trying to manage offsets for exactly-once semantics.

However, while accessing the offset it throws the following exception:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges"

The part of the code that does this is as below :

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

Here dataStream is a direct stream(DStream[String]) created using KafkaUtils API something like :

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

If somebody can help me understand what I am doing wrong here. transform is the first method in the chain of methods performed on datastream as mentioned in the official documentation as well

Thanks.

like image 598
taransaini43 Avatar asked Mar 12 '23 04:03

taransaini43


1 Answers

Your problem is:

.map(._2)

Which creates a MapPartitionedDStream instead of the DirectKafkaInputDStream created by KafkaUtils.createKafkaStream.

You need to map after transform:

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))

kafkaStream
  .transform { 
    rdd => 
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
  }
  .map(_._2)
  .foreachRDD(rdd => // stuff)
like image 172
Yuval Itzchakov Avatar answered Mar 19 '23 07:03

Yuval Itzchakov