Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka - Autocommit = false and no commit

My Producer writes 10 messages with key 0,1,2,3,4,5,6,7,8,9

My Consumer (with auto commit = false) during first loop reads the messages but when reads the message "2" I force an exception so I don't commit. (The exception is threw only at first loop - there is the flag forceError)

During second loop the consumer should reads all messages 0,1,2,3,4,5,6,7,8,9 and commits but is not so, why?

the log is this:

write message with key: 0
write message with key: 1
write message with key: 2
write message with key: 3
write message with key: 4
write message with key: 5
write message with key: 6
write message with key: 7
write message with key: 8
write message with key: 9

1 loop, consumerRecords.count: 10
read message with key: 0
read message with key: 1
read message with key: 2
java.lang.Exception: error don't commit
2 loop, consumerRecords.count(): 0
3 loop, consumerRecords.count(): 0
4 loop, consumerRecords.count(): 0
5 loop, consumerRecords.count(): 0
6 loop, consumerRecords.count(): 0
7 loop, consumerRecords.count(): 0
8 loop, consumerRecords.count(): 0
and so on....

java source:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;

/*
maven dependency:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

* */
public class KafkaConsumerDemo {
    public static void main(String[] args) {

    String topic = UUID.randomUUID().toString();
    /// producer - write messages with key 0,1,2,3,4,5,6,7,8,9
    {
        Properties properties = new Properties();

        // kafka bootstrap server
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
        // producer acks
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "3");
        properties.setProperty("linger.ms", "1");

        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);


        for (int key = 0; key < 10; key++) {
            System.out.println("write message with key: " + key);
            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<String, String>(topic, Integer.toString(key), "message that has key: " + Integer.toString(key));
            producer.send(producerRecord);
        }


        producer.close();
    }

      /*consumer -- read message and throw exception during first loop
    first loop read 0,1,2 and force an exception so I don't commit
    second loop should read 0,1,2,3,4,5,6,7,8,9 and commit but is not so :(

     */
    {
        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());

        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.offset.reset", "earliest");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Arrays.asList(topic));

        boolean forceError = true;
        int n = 0;
        while (true) {
            n++;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
            System.out.println(n + " loop, consumerRecords.count: " + consumerRecords.count());

            try {
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("read message with key: " + consumerRecord.key());
                    if (forceError && consumerRecord.key().equals("2")) {
                        forceError = false;
                        throw new Exception("error don't commit");
                    }

                }
                if (!consumerRecords.isEmpty()) {
                    System.out.println("commit ok");
                    kafkaConsumer.commitSync();
                }
            } catch (Exception e) {
                System.out.println(e);
            }

        }
    }
}
}
like image 556
gekomad Avatar asked Mar 07 '23 21:03

gekomad


1 Answers

The key line in the output is:

1 loop, consumerRecords.count: 10

Basically the first call to poll() returned all 10 records. The consumer has received all messages and handed them to you in a ConsumerRecord container. Then it's up to you to do what you want with them but from the consumer point of view it has completed its job.

Even if you don't commit, the consumer keeps track of its current position. Its current position is offset 11 which is the end of the log so it won't be receiving anything until new messages are produced to the topic.

As you did not commit and have the reset policy to earliest, if you restart the consumer (or unsubscribe and re-subscribe), it will receive all 10 messages again.

like image 121
Mickael Maison Avatar answered May 04 '23 22:05

Mickael Maison