I have the following code
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
consumer.seekToEnd(emptyList())
val pollDuration = 30 // seconds
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
The topic which the consumer is subscribed to continously receives records. Occasionally, the consumer will crash due to the processing step. When the consumer then is restarted, I want it to consume from the latest offset on the topic (i.e. ignore records that were published to the topic while the consumer was down). I thought the seekToEnd()
method would ensure that. However, it seems like the method has no effect at all. The consumer starts to consume from the offset from which it crashed.
What is the correct way to use seekToEnd()
?
Edit: The consumer is created with the following configs
fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
val props = setupConfig(valueDeserializer)
Common.setupConsumerSecurityProtocol(props)
return createConsumer(props)
}
fun setupConfig(valueDeserializer: String): Properties {
// Configuration setup
val props = Properties()
props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = config.kafka.stringDeserializer
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = valueDeserializer
props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = config.kafka.maxPollIntervalMs
props[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = config.kafka.sessionTimeoutMs
props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
return props
}
fun <T> createConsumer(props: Properties): KafkaConsumer<String, T> {
val consumer = KafkaConsumer<String, T>(props)
consumer.subscribe(listOf(config.kafka.inputTopic))
return consumer
}
By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.
The answer is it depends on the offset retention period. The default retention period for message offsets in Kafka is one week (7 days). If Kafka was configured using the default, then to answer the questions above, the offset would begin at 32.
Earliest — when the consumer application is initialized the first time or binds to a topic and wants to consume the historical messages present in a topic, the consumer should configure auto. offset. reset to earliest. Latest — This is the default offset reset value if you have not configured any.
How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.
What determines Kafka consumer offset? 280 Understanding Kafka Topics and Partitions 4 kafka-python read from last produced message after a consumer restart 1 Kafka-Python consume last unread message 0 Kafka consumer stops consuming if one of brokers become unavailable 1 Kafka Consumer not consuming from last commited offset after restart
Let’s say for instance that a consumer group consumes 12 messages before failing. When the consumer starts up again, it will continue from where it left off in the offset (or position) because that offset is stored by Kafka and/or ZooKeeper. If you are ever curious about where the offset is at, you can open the kafka-consumer-groups tool.
In the meantime, you can try a few different approaches: (1) set auto_offset_reset='earliest' in your KafkaConsumer configuration. This will cause the consumer to fetch from the beginning of the topic/partition if the consumer group does not have a committed offset.
Every Kafka consumer belongs to a consumer group, grouped together by the consumer’s group.id configuration setting. A consumer group will contain one or more consumers. The consumers within the consumer group will be assigned to topic partitions in order to consume their messages.
I found a solution!
I needed to add a dummy poll as a part of the consumer initialization process. Since several Kafka methods are evaluated lazily, it is necessary with a dummy poll to assign partitions to the consumer. Without the dummy poll, the consumer tries to seek to the end of partitions that are null. As a result, seekToEnd()
has no effect.
It is important that the dummy poll duration is long enough for the partitions to get assigned. For instance with consumer.poll((Duration.ofSeconds(1))
, the partitions did not get time to be assigned before the program moved on to the next method call (i.e. seekToEnd()
).
Working code could look something like this
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
// Initialization
val pollDuration = 30 // seconds
consumer.poll((Duration.ofSeconds(pollDuration)) // Dummy poll to get assigned partitions
// Seek to end and commit new offset
consumer.seekToEnd(emptyList())
consumer.commitSync()
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
The seekToEnd
method requires the information on the actual partition (in Kafka terms TopicPartition
) on which you plan to make your consumer read from the end.
I am not familiar with the Kotlin API, but checking the JavaDocs on the KafkaConsumer's method seekToEnd you will see, that it asks for a collection of TopicPartitions.
As you are currently using emptyList()
, it will have no impact at all, just like you observed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With