The first N calls to poll return nothing when I register a consumer with a new group id.
I want to test that when I call a service, a Kafka event is published. The issue is that whenever I change the groupId, the first N polls return nothing. I understand that Kafka first registers the consumer when polling but I find the number of polls (time) required to register the consumer to be too random.
The consumer configuration:
Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
Steps:
consumer.poll(Duration.ofSeconds(5))
just to make sure that the consumer is registered and the offset set.consumer.poll(Duration.ofSeconds(5))
and hopefully receive some records. This is the step that fails.Is there a way to make sure that the second poll always returns the record? I tried to make the first poll last for 1 minute (and I already think that 5 seconds is too much to wait for every test) and it would still some times work and some times not.
Thanks.
The reason it's not working with your "new groupId", is that you are in "latest" mode.
Default value is "latest", you need either to be in "earliest" mode or to poll a first time with your "new groupId" or commit offset for this "new groupId" for this topic.
You need to register the "groupId" to the topic, not the consumer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With