Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read data using Kafka Consumer API from beginning?

Please can anyone tell me how to read messages using the Kafka Consumer API from the beginning every time when I run the consumer.

like image 290
Nits Avatar asked Feb 17 '15 12:02

Nits


People also ask

How do I read messages from the beginning of Kafka?

If you want to process a topic from its beginning, you can simple start a new consumer group (i.e., choose an unused group.id ) and set auto. offset. reset = earliest . Because there are no committed offsets for a new group, auto offset reset will trigger and the topic will be consumed from its beginning.

How does consumer get data from Kafka?

The main way we scale data consumption from a Kafka topic is by adding more consumers to a consumer group. It is common for Kafka consumers to do high-latency operations such as write to a database or a time-consuming computation on the data.


10 Answers

This works with the 0.9.x consumer. Basically when you create a consumer, you need to assign a consumer group id to this consumer using the property ConsumerConfig.GROUP_ID_CONFIG. Generate the consumer group id randomly every time you start the consumer doing something like this properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); (properties is an instance of java.util.Properties that you will pass to the constructor new KafkaConsumer(properties)).

Generating the client randomly means that the new consumer group doesn't have any offset associated to it in kafka. So what we have to do after this is to set a policy for this scenario. As the documentation for the auto.offset.reset property says:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found or the consumer's group
  • anything else: throw exception to the consumer.

So from the options above listed we need to choose the earliest policy so the new consumer group starts from the beginning every time.

Your code in java, will look something like this:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);

The only thing that you need to figure it out now, is when having multiple consumers that belong to the same consumer group but are distributed how to generate a random id and distribute it between those instances so they all belong to the same consumer group.

Hope it helps!

like image 91
Nautilus Avatar answered Oct 20 '22 19:10

Nautilus


One option to do this would be to have a unique group id each time you start which will mean that Kafka would send you the messages in the topic from the beginning. Do something like this when you set your properties for KafkaConsumer:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

The other option is to use consumer.seekToBeginning(consumer.assignment()) but this will not work unless Kafka first gets a heartbeat from your consumer by making the consumer call the poll method. So call poll(), then do a seekToBeginning() and then again call poll() if you want all the records from the start. It's a little hackey but this seems to be the most reliable way to do it as of the 0.9 release.

// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);
like image 31
ucsunil Avatar answered Oct 20 '22 20:10

ucsunil


props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

If you simply avoid saving any offsets the consumer will always reset at the beginning.

In order for this to work, you can never use the commit API (that's what I mean by avoid saving any offsets). Disabling auto commits does not count as a usage of the commit API.

I've done this a lot and it works for me, especially when developing and testing. For prod I prefer skm's answer https://stackoverflow.com/a/47530912/1213475 since I always want commits in prod, it's a simple way to monitor the consumption and the consumer lag.

like image 43
offroff Avatar answered Oct 20 '22 19:10

offroff


One possible solution is to use an implementation of ConsumerRebalanceListener while subscribing to one or more topics. The ConsumerRebalanceListener contains callback methods when new partitions are assigned or removed from a consumer. The following code sample illustrates this :

public class SkillsConsumer {

private String topic;

private KafkaConsumer<String, String> consumer;

private static final int POLL_TIMEOUT = 5000;

public SkillsConsumer(String topic) {
    this.topic = topic;
    Properties properties = ConsumerUtil.getConsumerProperties();
    properties.put("group.id", "consumer-skills");
    this.consumer = new KafkaConsumer<>(properties);
    this.consumer.subscribe(Collections.singletonList(this.topic),
            new PartitionOffsetAssignerListener(this.consumer));
    }
}

public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {

private KafkaConsumer consumer;

public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
    this.consumer = kafkaConsumer;
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    //reading all partitions from the beginning
    for(TopicPartition partition : partitions)
        consumer.seekToBeginning(partition);
}

}

Now whenever the partitions are assigned to the consumer, each partition will be read from the beginning.

like image 26
skm Avatar answered Oct 20 '22 20:10

skm


1) https://stackoverflow.com/a/17084401/3821653

2) http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3CCAOG_4QYz2ynH45a8kXb8qw7xw4vDRRwNqMn5j9ERFxJ8RfKGCg@mail.gmail.com%3E

To reset the consumer group, you can delete the Zookeeper group id

 import kafka.utils.ZkUtils;
 ZkUtils.maybeDeletePath(<zkhost:zkport>, </consumers/group.id>);`
like image 21
KingJulien Avatar answered Oct 20 '22 18:10

KingJulien


So for me what worked was a combination of what has been suggested above. The key change was to include

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

and have a randomly generated GROUP ID each time. But this alone didn't work for me. For some reason the first time I polled the consumer it never got any records. I had to hack it to get it to work -

consumer.poll(0); // without this the below statement never got any records
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(100));

I'm new to KAFKA and have no idea why this is happening, but for anyone else still trying to get this to work, hope this helps.

like image 29
karthiks3000 Avatar answered Oct 20 '22 18:10

karthiks3000


This is my code to read messages from the beginning (using Java 11)

try (var consumer = new KafkaConsumer<String, String>(config)) {
     
        consumer.subscribe(Set.of(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                consumer.seekToBeginning(partitions);
            }
        });
        // polling messages
}

You can see full code example here:

https://gist.github.com/vndung/4c9527b3aeafec5d3245c7a3b921f8b1

like image 36
dungvo Avatar answered Oct 20 '22 20:10

dungvo


If you are using the java consumer api more specifically org.apache.kafka.clients.consumer.Consumer, You can try the seek* methods.

consumer.seekToBeginning(consumer.assignment())

Here, consumer.assignment() returns all the partitions assigned to a given consumer and seekToBeginning will start from the earliest offset for the given collection of partitions.

like image 34
Saideep Sambaraju Avatar answered Oct 20 '22 20:10

Saideep Sambaraju


while using the High Level consumer set props.put("auto.offset.reset", "smallest"); in times of creating the ConsumerConfig

like image 34
user2720864 Avatar answered Oct 20 '22 20:10

user2720864


To always read from offset 0 without creating new groupId everytime.

    // ... Assuming the props have been set properly.
    // ... enable.auto.commit and auto.offset.reset as default

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList(topic));
    consumer.poll(0);  // without this, the assignment will be empty. 
    consumer.assignment().forEach(t -> {
        System.out.printf("Set %s to offset 0%n", t.toString());
        consumer.seek(t, 0);
    });
    while (true) {
     // ... consumer polls messages as usual.
    }
like image 27
rzch Avatar answered Oct 20 '22 19:10

rzch