Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer does not start from latest message

I want to have a Kafka Consumer which starts from the latest message in a topic.

Here is the java code:

private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
    properties.setProperty("bootstrap.servers","localhost");
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "latest");
    consumer = new KafkaConsumer<>(properties);

    consumer.subscribe(Collections.singletonList("mytopic"));
}

@Override
public StreamHandler call() throws Exception
{
    while (true) 
    {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
        Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
        for(ConsumerRecord<String, String> rec : records)
        {
            System.out.println(rec.value());
        }
    }
}

Although the value for auto.offset.reset is latest, but the consumer starts form messages which belong to 2 days ago and then it catches up with the latest messages.

What am I missing?

like image 210
Ehsan Avatar asked May 12 '17 20:05

Ehsan


People also ask

What is the difference between earliest and latest in Kafka?

Earliest — when the consumer application is initialized the first time or binds to a topic and wants to consume the historical messages present in a topic, the consumer should configure auto. offset. reset to earliest. Latest — This is the default offset reset value if you have not configured any.

How do I restart a Kafka consumer?

Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.

Are Kafka messages consumed in order?

Kafka does not provide ordering guarantees within a topic, only within a partition. In other words, if you sent a message A, then message B to partition 0, then the order will be that: first A, then B.


2 Answers

Have you run this same code before with the same group.id? The auto.offset.reset parameter is only used if there is not an existing offset already stored for your consumer. So if you've run the example previously, say two days ago, and then you run it again, it will start from the last consumed position.

Use seekToEnd() if you would like to manually go to the end of the topic.

See https://stackoverflow.com/a/32392174/1392894 for a slightly more thorough discussion of this.

like image 85
Brian Ecker Avatar answered Nov 30 '22 10:11

Brian Ecker


If you want to manually control the position of your offsets you need to set enable.auto.commit = false.

If you want to position all offsets to the end of each partition then call seekToEnd()

https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd(java.util.Collection)

like image 31
Hans Jespersen Avatar answered Nov 30 '22 11:11

Hans Jespersen