Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to acknowledge current offset in spring kafka for manual commit

I am using Spring Kafka first time and I am not able to use Acknowledgement.acknowledge() method for manual commit in my consumer code. please let me know if anything missing in my consumer configuration or listener code. or else is there other way to handle acknowledge offset based on condition. Here i'm looking solution like if the offset is not committed/ acknowledge manually, it should pick same message/offset by consumer.

Configuration

import java.util.HashMap; import java.util.Map;  import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;  @EnableKafka @Configuration public class ConsumerConfig {      @Value(value = "${kafka.bootstrapAddress}")     private String bootstrapAddress;      @Value(value = "${kafka.groupId}")     private String groupId;      @Bean     public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {         Map<String, Object> props = new HashMap<String, Object>();         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,                 StringDeserializer.class);         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,                 StringDeserializer.class);         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();         factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(                 props));         factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);         factory.getContainerProperties().setSyncCommits(true);         return factory;     } } 

Listener

private static int value = 1;  @KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory") public void listenPEN_RE(@Payload String message,         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,         @Header(KafkaHeaders.OFFSET) int offsets,         Acknowledgment acknowledgment) {      if (value%2==0){         acknowledgment.acknowledge();     }     value++; } 
like image 370
user2550140 Avatar asked Nov 22 '17 06:11

user2550140


People also ask

How do you manually commit offset in Kafka?

Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.

Which consumer in Kafka will commit the current offset?

By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.

How do I acknowledge Kafka messages?

Once the messages are processed, consumer will send an acknowledgement to the Kafka broker. Once Kafka receives an acknowledgement, it changes the offset to the new value and updates it in the Zookeeper.

How offsets are maintained in Kafka?

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.


2 Answers

Set the enable-auto-commit property to false:

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Set the ack-mode to MANUAL_IMMEDIATE:

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

Then, in your consumer/listener code, you can commit the offset manually, like this:

@KafkaListener(topics = "testKafka") public void receive(ConsumerRecord<?, ?> consumerRecord,           Acknowledgment acknowledgment) {      System.out.println("Received message: ");     System.out.println(consumerRecord.value().toString());      acknowledgment.acknowledge(); } 

Update: I created a small POC for this. Check it out here, might help you.

like image 158
contactsunny Avatar answered Sep 22 '22 00:09

contactsunny


You need to do the following

1) Set enable-auto-commit property to false

consumerConfigProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 

2) Set the ACK Mode to MANUL_IMMEDIATE

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); 

3) For processed records you need to call acknowledgment.acknowledge();

4) for failed records call acknowledgment.nack(10); Note: the nack method takes a long parameter which is the sleep time and it should be less than max.poll.interval.ms

Below is a sample code

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory") public void listenPEN_RE(@Payload String message,         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,         @Header(KafkaHeaders.OFFSET) int offsets,         Acknowledgment acknowledgment) {      if (value%2==0){         acknowledgment.acknowledge();     } else {         acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms     }     value++; } 
like image 42
Amit Nargund Avatar answered Sep 20 '22 00:09

Amit Nargund