Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka consumer not able to consume records

We are using Spring Kafka to consume records in batches. We are sometimes facing an issue where the application starts and it doesn't consume any records even though there are enough unread messages. Instead we continuously see info logs saying.

[INFO]-[FetchSessionHandler:handleError:440] - [Consumer clientId=consumer-2, groupId=groupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1027: org.apache.kafka.common.errors.DisconnectException. 

People are facing this issue and everyone says to ignore it, since it is just a info log. Even, we see after sometime the application starts picking up the records without doing anything. But, it is very unpredictable on how long it might take to start consuming records :(

We didn't see this error when we were using Spring cloud stream. Not sure if we have missed any configuration in spring-kafka.

Anyone faced this issue in past, please let us know if we are missing something. We have huge load in our topics and if there is a lot of lag, could this happen?

We are using Spring Kafka of 2.2.2.RELEASE Spring boot 2.1.2.RELEASE Kafka 0.10.0.1 (We understand it's very old, because of unavoidable reasons we are having to use this :()

Here is our code:

application.yml

li.topics: CUSTOM.TOPIC.JSON
    spring:
      application:
        name: DataPublisher
      kafka:
        listener:
          type: batch
          ack-mode: manual_immediate
        consumer:
          enable-auto-commit: false
          max-poll-records: 500
          fetch-min-size: 1
          fetch-max-wait: 1000
          group-id: group-dev-02
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer:CustomResourceDeserialiser
          auto-offset-reset: earliest

Consumer:

public class CustomKafkaBatchConsumer {


  @KafkaListener(topics = "#{'${li.topics}'.split(',')}", id = "${spring.kafka.consumer.group-id}")
  public void receiveData(@Payload List<CustomResource> customResources,
      Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
}
}

Deserialiser:

public class CustomResourceDeserialiser implements Deserializer<CustomResource> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
  }

  @Override
  public CustomResource deserialize(String topic, byte[] data) {
    if (data != null) {
      try {
        ObjectMapper objectMapper = ObjectMapperFactory.getInstance();
        return objectMapper.readValue(data, CustomResource.class);
      } catch (IOException e) {
        log.error("Failed to deserialise with {}",e.getMessage());
      }
    }
    return null;
  }

  @Override
  public void close() {

  }
}
like image 900
jhansi Avatar asked Dec 14 '25 18:12

jhansi


2 Answers

This could be because of this Kafka-8052 - Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request issue. This is fixed in Kafka 2.3.0

Unfortunately, as of Aug 21, 2019 Spring cloud streams haven't upgraded it's dependencies yet with 2.3.0 release of kafka-clients yet.

You can try adding these as explicit dependencies in your gradle

    compile ('org.apache.kafka:kafka-streams:2.3.0')
    compile ('org.apache.kafka:kafka-clients:2.3.0')
    compile ('org.apache.kafka:connect-json:2.3.0')
    compile ('org.apache.kafka:connect-api:2.3.0')

Update

This could also be caused by kafka Broker - client incompatibility. If your cluster is behind the client version you might see all kinds of odd problems such as this. Example, let's say, your kafka broker is on 1.x.x and your kafka-consumer is on 2.x.x, this could happen

like image 93
so-random-dude Avatar answered Dec 18 '25 01:12

so-random-dude


I have faced the same problem before, solution was either the decrease current partition count or increase the number of consumers. In my case, we have ~100M data on 60 partition and I came across the same error when single pod is running. I scaled 30 pods (30 consumers) and the problem was solved.

like image 42
Okan Avatar answered Dec 18 '25 01:12

Okan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!