Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer - Poll behaviour

I'm facing some serious problems trying to implement a solution for my needs, regarding KafkaConsumer (>=0.9).

Let's imagine I have a function that has to read just n messages from a kafka topic.

For example: getMsgs(5) --> gets next 5 kafka messages in topic.

So, I have a loop that looks like this. Edited with actual correct parameters. In this case, the consumer's max.poll.records param was set to 1, so the actual loop only iterated once. Different consumers(some of them iterated through many messages) shared an abstract father (this one), that's why it's coded that way. The numMss part was ad-hoc for this consumer.

for (boolean exit= false;!exit;)
{
   Records = consumer.poll(config.pollTime);
   for (Record r:records) 
   {
       processRecord(r); //do my things
       numMss++;
       if (numMss==maximum) //maximum=5
       {   
          exit=true;
          break;
       }
   }
}

Taking this into account, the problem is that the poll() method could get more than 5 messages. For example, if it gets 10 messages, my code will forget forever those other 5 messages, since Kafka will think they're already consumed.

I tried commiting the offset but doesn't seem to work:

    consumer.commitSync(Collections.singletonMap(partition,
    new OffsetAndMetadata(record.offset() + 1)));

Even with the offset configuration, whenever I launch again the consumer, it won't start from the 6th message (remember, I just wanted 5 messages), but from the 11th (since the first poll consumed 10 messages).

Is there any solution for this, or maybe (most surely) am I missing something?

Thanks in advance!!

like image 907
aran Avatar asked Jun 21 '16 11:06

aran


People also ask

How does Kafka consumer poll?

The Kafka consumer poll() method fetches records in sequential order from a specified topic/partitions. This poll() method is how Kafka clients read data from Kafka. When the poll() method is called, the consumer will fetch records from the last consumed offset.

What is the default poll interval of Kafka consumer?

See this answer for more details. max.poll.interval.ms default value is five minutes, so if your consumerRecords. forEach takes longer than that your consumer will be considered dead.

Why does Kafka use polling?

The number of consumers and the number of partitions are not necessarily the same. The poll (long) method of Kafka helps implement the functions such as message acquisition, partition balancing, and heartbeat detection between consumers and Kafka brokers.

What happens if Kafka consumer is down?

If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.


1 Answers

You can set max.poll.records to whatever number you like such that at most you will get that many records on each poll.

For your use case that you stated in this problem you don't have to commit offsets explicitly by yourself. you can just set enable.auto.commit to trueand set auto.offset.reset to earliest such that it will kick in when there is no consumer group.id (other words when you are about start reading from a partition for the very first time). Once you have a group.id and some consumer offsets stored in Kafka and in case your Kafka consumer process dies it will continue from the last committed offset since it is the default behavior because when a consumer starts it will first look for if there are any committed offsets and if so, will continue from the last committed offset and auto.offset.reset won't kick in.

like image 152
user1870400 Avatar answered Sep 23 '22 01:09

user1870400