Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)

I am testing the Kafka High Level Consumer using the ConsumerGroupExample code from the Kafka site. I would like to retrieve all the existing messages on the topic called "test" that I have in the Kafka server config. Looking at other blogs, auto.offset.reset should be set to "smallest" to be able to get all messages:

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)    {
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "10000");     

    return new ConsumerConfig(props);
}

The question I really have is this: what is the equivalent Java api call for the High Level Consumer that is the equivalent of:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

like image 296
user3307283 Avatar asked Feb 13 '14 18:02

user3307283


2 Answers

Basically, everytime a new consumer tries to consume a topic, it'll read messages from the beginning. If you're especially just consuming from the beginning each time for testing purposes, everytime you initialise your consumer with a new groupID, it'll read the messages from the beginning. Here's how I did it :

properties.put("group.id", UUID.randomUUID().toString());

and read messages from the beginning each time!

like image 93
Karishma Gulati Avatar answered Sep 21 '22 12:09

Karishma Gulati


Looks like you need to use the "low level SimpleConsumer API"

For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in here.

This example worked for getting all messages from a topic with the following arguments: (note that the port is the Kafka port, not the ZooKeeper port, topics set up from this example):

10 my-replicated-topic 0 localhost 9092

Specifically, there is a method to get readOffset which takes kafka.api.OffsetRequest.EarliestTime():

long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

Here is another post may provide some alternate ideas on how to sort this out: How to get data from old offset point in Kafka?

like image 41
pherris Avatar answered Sep 25 '22 12:09

pherris