I have subscribed to a Kafka topic as below. I need to run some logic only after the consumer has been assigned a partition.
However consumer.assignment()
comes back as an empty set no matter how long I wait. If I do not have the while loop and then do a consumer.poll()
I do get the records from the topic.Can any one tell me why this is happening ?
consumer.subscribe(topics);
Set<TopicPartition> assigned=Collections.emptySet();
while(isAssigned) {
assigned = consumer.assignment();
if(!assigned.isEmpty()) {
isAssigned= false;
}
}
//consumer props
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,yyy:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("max.poll.records", "100");
Each consumer in the group is assigned a set of partitions. An assignment essentially functions as an exclusive lock on a given set of partitions. Each partition is assigned to exactly one consumer per group, and only the consumer that owns that partition will be able to read its data while the assignment persists.
The poll() method is the function a Kafka consumer calls to retrieve records from a given topic. When calling the poll() method, consumers provide a timeout argument. This is the maximum amount of time to wait for records to process before returning.
commitSync. Commit the specified offsets for the specified list of topics and partitions. This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup.
If the consumer fails after writing the data to the database but before saving the offsets back to Kafka, it will reprocess the same records next time it runs and save them to the database once more.
Until you call poll()
, the consumer is just idling.
Only after poll()
is invoked, it will initiate a connection to the cluster, get assigned partitions and attempt to fetch messages.
This is mentioned in the Javadoc:
After subscribing to a set of topics, the consumer will automatically join the group when poll(Duration) is invoked.
Once you start calling poll()
, you can use assignment()
or even register a ConsumerRebalanceListener
to be notify when partitions are assigned or revoked from your consumer instance.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With