Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Acknowledgement.acknowledge() throwing exception in spring-kafka @KafkaListener

When I set enable.auto.commit to false and try to manually commit offset using annotation based spring-kafka @KafkaListener, I get a org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message

I have a very simple code as follows:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
    System.out.println("Received Messasge in group 'foo': " + message);

    // TODO: Do something with the message

And when I send a message from the producer, I get the following exception:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message.

Endpoint handler details:

Method [public void com.****.*****.*******.KafkaMessageListener.listenFooGroup(java.lang.String,org.springframework.kafka.support.Acknowledgment)]

Bean [com.****.*****.*******.KafkaMessageListener@5856dbe4]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}], failedMessage=GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}]

Please help. TIA.

like image 296
Aravind Kv Avatar asked Jun 27 '17 19:06

Aravind Kv

1 Answers

You have to the set the container factory's containerProperties ackMode to MANUAL or MANUAL_IMMEDIATE to get an Acknowledgment object.

With other ack modes, the container is responsible for committing the offset.


Or set the ....ackMode property if using Spring Boot

like image 119
Gary Russell Avatar answered Nov 17 '22 15:11

Gary Russell