I'm trying to obtain and store the offset for a specific message in Kafka by using Spark Direct Stream. Looking at the Spark documentation is simple to obtain the range offsets for each partition but what I need is to store the start offset for each message of a topic after a full scan of the queue.
OFFSET IN KAFKAThe offset is a unique id assigned to the partitions, which contains messages. The most important use is that it identifies the messages through id, which are available in the partitions. In other words, it is a position within a partition for the next message to be sent to a consumer.
Each partition maintains the messages it has received in a sequential order where they are identified by an offset, also known as a position. Developers can take advantage of using offsets in their application to control the position of where their Spark Streaming job reads from, but it does require offset management.
Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.
Yes, you can use MessageAndMetadata version of createDirectStream
which allows you to access message metadata
.
You can find example here which returns Dstream of tuple3
.
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
In above example tuple3._1
will have topic
, tuple3._2
will have offset
and tuple3._3
will have message
.
Hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With