Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer.poll returns no records

The first N calls to poll return nothing when I register a consumer with a new group id.

I want to test that when I call a service, a Kafka event is published. The issue is that whenever I change the groupId, the first N polls return nothing. I understand that Kafka first registers the consumer when polling but I find the number of polls (time) required to register the consumer to be too random.

The consumer configuration:

Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

Steps:

  1. Before every test I call consumer.poll(Duration.ofSeconds(5)) just to make sure that the consumer is registered and the offset set.
  2. I call the service and assert on the response. If I check Kafka using the UI, the event is published.
  3. I call consumer.poll(Duration.ofSeconds(5)) and hopefully receive some records. This is the step that fails.

Is there a way to make sure that the second poll always returns the record? I tried to make the first poll last for 1 minute (and I already think that 5 seconds is too much to wait for every test) and it would still some times work and some times not.

Thanks.

like image 777
Tudor Avatar asked May 02 '19 09:05

Tudor


1 Answers

The reason it's not working with your "new groupId", is that you are in "latest" mode.

Default value is "latest", you need either to be in "earliest" mode or to poll a first time with your "new groupId" or commit offset for this "new groupId" for this topic.

You need to register the "groupId" to the topic, not the consumer.

like image 79
Gremi64 Avatar answered Sep 17 '22 16:09

Gremi64