Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer Assignment returns Empty Set

I have subscribed to a Kafka topic as below. I need to run some logic only after the consumer has been assigned a partition.

However consumer.assignment() comes back as an empty set no matter how long I wait. If I do not have the while loop and then do a consumer.poll() I do get the records from the topic.Can any one tell me why this is happening ?

    consumer.subscribe(topics);
    Set<TopicPartition> assigned=Collections.emptySet();
    while(isAssigned) {
          assigned = consumer.assignment();
          if(!assigned.isEmpty()) {
              isAssigned= false;
          }
    }

//consumer props
Properties props = new Properties();
        props.put("bootstrap.servers", "xxx:9092,yyy:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://xxx:8081");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("max.poll.records", "100");
like image 750
rookie Avatar asked Jan 22 '19 21:01

rookie


People also ask

What is consumer assignment?

Each consumer in the group is assigned a set of partitions. An assignment essentially functions as an exclusive lock on a given set of partitions. Each partition is assigned to exactly one consumer per group, and only the consumer that owns that partition will be able to read its data while the assignment persists.

What is poll () in Kafka?

The poll() method is the function a Kafka consumer calls to retrieve records from a given topic. When calling the poll() method, consumers provide a timeout argument. This is the maximum amount of time to wait for records to process before returning.

What does commitSync do in Kafka?

commitSync. Commit the specified offsets for the specified list of topics and partitions. This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup.

What happens when Kafka consumer fails?

If the consumer fails after writing the data to the database but before saving the offsets back to Kafka, it will reprocess the same records next time it runs and save them to the database once more.


1 Answers

Until you call poll(), the consumer is just idling.

Only after poll() is invoked, it will initiate a connection to the cluster, get assigned partitions and attempt to fetch messages.

This is mentioned in the Javadoc:

After subscribing to a set of topics, the consumer will automatically join the group when poll(Duration) is invoked.

Once you start calling poll(), you can use assignment() or even register a ConsumerRebalanceListener to be notify when partitions are assigned or revoked from your consumer instance.

like image 82
Mickael Maison Avatar answered Sep 22 '22 20:09

Mickael Maison