Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I create an RDD from a kafka topic if I do not know the until offset?

KafkaUtils.createRDD takes the offsetRanges as a parameter. I do not know the until offset of the topic I want to read from. I want to read at most the first 30 messages in the topic.

I see there is a KafkaCluster.html#getLatestLeaderOffsets but that is annotated as a Develop API.

Is there any public way to determine the earliest and latest offsets for a topic?

like image 934
Balpeck Avatar asked May 01 '16 16:05

Balpeck


People also ask

What is offset in Kafka?

The offset is a simple integer number that is used by Kafka to maintain the current position of a consumer. That's it. The current offset is a pointer to the last record that Kafka has already sent to a consumer in the most recent poll. So, the consumer doesn't get the same record twice because of the current offset.

What is auto offset reset in Kafka?

Second, use auto. offset. reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the “earliest” offset or the “latest” offset (the default).

Can Spark read from Kafka?

Using Spark Streaming we can read from Kafka topic and write to Kafka topic in TEXT, CSV, AVRO and JSON formats, In this article, we will learn with scala example of how to stream from Kafka messages in JSON format using from_json() and to_json() SQL functions.

What is offset in Spark streaming?

Kafka offset committer for Spark structured streaming. Kafka offset committer helps structured streaming query which uses Kafka Data Source to commit offsets which batch has been processed. This project is not for replacing checkpoint mechanism of Spark with Kafka's one.


1 Answers

It's not that simple of a thing, because only the individual brokers know what the latest offset info for a given topic / partition is.

You can do an OffsetRequest. The following will return the earliest and latest offsets for a topic / partition (it's Scala, but you should be able to get the idea if you don't use Scala).

Note you have to use a SimpleConsumer connected to the broker that is the leader for the requested partition. Usually what I do is, I create a SimpleConsumer for each of my brokers. Then I do a meta data request and get the partition to leader mapping, then foreach partition I do this:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
  val time = kafka.api.OffsetRequest.LatestTime
  val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)))
  val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq")
  val resp = consumer.getOffsetsBefore(req)
  val offsets = resp.offsets(topic, partition)
  if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0))
  else (0, -1)
}

Hope this helps.

like image 126
David Griffin Avatar answered Sep 21 '22 23:09

David Griffin