Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to obtain specific message offset in Kafka+SparkStreaming?

I'm trying to obtain and store the offset for a specific message in Kafka by using Spark Direct Stream. Looking at the Spark documentation is simple to obtain the range offsets for each partition but what I need is to store the start offset for each message of a topic after a full scan of the queue.

like image 894
Beniamino Del Pizzo Avatar asked Jun 01 '16 15:06

Beniamino Del Pizzo


People also ask

Is Kafka offset unique?

OFFSET IN KAFKAThe offset is a unique id assigned to the partitions, which contains messages. The most important use is that it identifies the messages through id, which are available in the partitions. In other words, it is a position within a partition for the next message to be sent to a consumer.

What is offset in Spark streaming?

Each partition maintains the messages it has received in a sequential order where they are identified by an offset, also known as a position. Developers can take advantage of using offsets in their application to control the position of where their Spark Streaming job reads from, but it does require offset management.

Does Kafka keep track of offset?

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.


1 Answers

Yes, you can use MessageAndMetadata version of createDirectStream which allows you to access message metadata.

You can find example here which returns Dstream of tuple3.

val ssc = new StreamingContext(sparkConf, Seconds(10))

val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)

val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
    (mmd.topic ,mmd.offset, mmd.message().toString)
  })

In above example tuple3._1 will have topic, tuple3._2 will have offset and tuple3._3 will have message.

Hope this helps!

like image 73
avr Avatar answered Sep 30 '22 14:09

avr