Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

read kafka message starting from a specific offset using high level API

Tags:

apache-kafka

I hope I am not making a mistake, but I remember that in Kafka documentation it mentioned that using high level APIs you can't start reading messages from a specific offset, but it was mentioned that it would change.

Is it possible now using the high level APIs to read messages from a specific partition and a specific offset? Could you please give me an example how to do it?

I am using kafka 0.8.1.1.

Thanks in advance.

like image 432
user1002065 Avatar asked Dec 16 '15 11:12

user1002065


People also ask

How do you read a Kafka topic from the beginning?

Reading whole messages To do so, use '-from-beginning' command with the above kafka console consumer command as: 'kafka-console-consumer. bat -bootstrap-server 127.0. 0.1:9092 -topic myfirst -from-beginning'.

How do you manually commit specific offset in Kafka?

Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.

What is start offset in Kafka?

OFFSET IN KAFKA The offset is a unique id assigned to the partitions, which contains messages. The most important use is that it identifies the messages through id, which are available in the partitions. In other words, it is a position within a partition for the next message to be sent to a consumer.


2 Answers

You can do that with kafka 0.9:

http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

public void seek(TopicPartition partition, long offset)

Overrides the fetch offsets that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets

like image 83
Anatoly Deyneka Avatar answered Oct 21 '22 03:10

Anatoly Deyneka


Kafka 0.8.1.1 can use Zookeeper to store offsets for each consumer group. If you configure your consumer to commit offsets to zookeeper than you Need just to manually set the starting offset for the topic and partition under zookeeper for your consumer Group. You Need to connect to zookeeper and use the set command:

set /consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

E.g. setting offset 10 for partition 0 of topicname for the spark-app consumer Group. set /consumers/spark-app/offsets/topicname/0 10

When a consumer starts to consume message from Kafka it always starts to consume from the last committed offset. If this last committes offset is not.valid for any reason than the consumer applies the logic due the configurazione properties auto.offset.reset.

Hope this helps.

like image 23
Beniamino Del Pizzo Avatar answered Oct 21 '22 05:10

Beniamino Del Pizzo