Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KafkaConsumer: `seekToEnd()` does not make consumer consume from latest offset

I have the following code

class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {

    fun run() {
        consumer.seekToEnd(emptyList())
        val pollDuration = 30 // seconds

        while (true) {
            val records = consumer.poll(Duration.ofSeconds(pollDuration))
            // perform record analysis and commitSync()
            }
        }
    }
}

The topic which the consumer is subscribed to continously receives records. Occasionally, the consumer will crash due to the processing step. When the consumer then is restarted, I want it to consume from the latest offset on the topic (i.e. ignore records that were published to the topic while the consumer was down). I thought the seekToEnd() method would ensure that. However, it seems like the method has no effect at all. The consumer starts to consume from the offset from which it crashed.

What is the correct way to use seekToEnd()?

Edit: The consumer is created with the following configs

fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
    val props = setupConfig(valueDeserializer)
    Common.setupConsumerSecurityProtocol(props)
    return createConsumer(props)
}

fun setupConfig(valueDeserializer: String): Properties {
    // Configuration setup
    val props = Properties()

    props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
    props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
    props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl

    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = config.kafka.stringDeserializer
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = valueDeserializer
    props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"

    props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = config.kafka.maxPollIntervalMs
    props[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = config.kafka.sessionTimeoutMs

    props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
    props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"

    return props
}

fun <T> createConsumer(props: Properties): KafkaConsumer<String, T> {
    val consumer = KafkaConsumer<String, T>(props)
    consumer.subscribe(listOf(config.kafka.inputTopic))
    return consumer
}
like image 939
Liverbird97 Avatar asked Dec 03 '21 13:12

Liverbird97


People also ask

Which consumer in Kafka will commit the current offset?

By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.

What determines Kafka consumer offset?

The answer is it depends on the offset retention period. The default retention period for message offsets in Kafka is one week (7 days). If Kafka was configured using the default, then to answer the questions above, the offset would begin at 32.

What is the difference between Kafka earliest and latest offset values?

Earliest — when the consumer application is initialized the first time or binds to a topic and wants to consume the historical messages present in a topic, the consumer should configure auto. offset. reset to earliest. Latest — This is the default offset reset value if you have not configured any.

How do I set consumer offset in Kafka?

How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.

What determines Kafka consumer offset?

What determines Kafka consumer offset? 280 Understanding Kafka Topics and Partitions 4 kafka-python read from last produced message after a consumer restart 1 Kafka-Python consume last unread message 0 Kafka consumer stops consuming if one of brokers become unavailable 1 Kafka Consumer not consuming from last commited offset after restart

How many messages does a Kafka consumer group consume before failing?

Let’s say for instance that a consumer group consumes 12 messages before failing. When the consumer starts up again, it will continue from where it left off in the offset (or position) because that offset is stored by Kafka and/or ZooKeeper. If you are ever curious about where the offset is at, you can open the kafka-consumer-groups tool.

How to force Kafka consumer to fetch from the earliest point?

In the meantime, you can try a few different approaches: (1) set auto_offset_reset='earliest' in your KafkaConsumer configuration. This will cause the consumer to fetch from the beginning of the topic/partition if the consumer group does not have a committed offset.

What is a consumer group in Kafka?

Every Kafka consumer belongs to a consumer group, grouped together by the consumer’s group.id configuration setting. A consumer group will contain one or more consumers. The consumers within the consumer group will be assigned to topic partitions in order to consume their messages.


2 Answers

I found a solution!

I needed to add a dummy poll as a part of the consumer initialization process. Since several Kafka methods are evaluated lazily, it is necessary with a dummy poll to assign partitions to the consumer. Without the dummy poll, the consumer tries to seek to the end of partitions that are null. As a result, seekToEnd() has no effect.

It is important that the dummy poll duration is long enough for the partitions to get assigned. For instance with consumer.poll((Duration.ofSeconds(1)), the partitions did not get time to be assigned before the program moved on to the next method call (i.e. seekToEnd()).

Working code could look something like this

class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {

    fun run() {
        // Initialization 
        val pollDuration = 30 // seconds
        consumer.poll((Duration.ofSeconds(pollDuration)) // Dummy poll to get assigned partitions

        // Seek to end and commit new offset
        consumer.seekToEnd(emptyList())
        consumer.commitSync() 

        while (true) {
            val records = consumer.poll(Duration.ofSeconds(pollDuration))
            // perform record analysis and commitSync()
            }
        }
    }
}
like image 195
Liverbird97 Avatar answered Oct 16 '22 09:10

Liverbird97


The seekToEnd method requires the information on the actual partition (in Kafka terms TopicPartition) on which you plan to make your consumer read from the end.

I am not familiar with the Kotlin API, but checking the JavaDocs on the KafkaConsumer's method seekToEnd you will see, that it asks for a collection of TopicPartitions.

As you are currently using emptyList(), it will have no impact at all, just like you observed.

like image 21
Michael Heil Avatar answered Oct 16 '22 10:10

Michael Heil