Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer missing messages while consuming messages in loop

I am running my consumer code in loop due to memory constraints, committing my data and then loading into tables

Following is the code which will run in loop

// here is the main part of the component,
// a piece of code executed in the row
// loop
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
System.out.println("Consumer created");
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
try {
    while (pollFlag) {
    ConsumerRecords<String, String> records = consumer.poll(context.consumer_polltime);
     if (records.isEmpty()) {
     globalMap.put("emptyRecordsFlag",false); //Passing the flag value to previous component to end loop
            break;
        }
        for (ConsumerRecord<String, String> record : records) {
            listPayload.add(record.value()); // Adding the messages to list
            i++;
            if(i>=msgbtch)
            {
                pollFlag = false; // Assigning flag value to end the poll at 5000 messages
                break;
            }       
        }
    }
globalMap.put("ConsumerObj",consumer);  
            
}   catch (Exception e) {
            System.out.println("Error Consuming Msg: " + e);
            // TODO: handle exception
            //consumer.close();
    }
row3.payload= String.valueOf(listPayload); // Passing the message data to next component
System.out.println("Committing");
consumer.commitSync();
System.out.println("Closing");
consumer.close();

But due to some reason I seem to be missing few messages. I believe this has to do something with consumer rebalancing/Committing.

How can I check if my consumer is ready to consume the next batch of messages from the start without missing any messages?

like image 572
Sharad R. Telkar Avatar asked Feb 01 '26 02:02

Sharad R. Telkar


1 Answers

Update : I was able to figure out the issue myself. The messages are already downloaded in the records and while looping as I have put the following condition

if(i>=msgbtch)
            {
                pollFlag = false; // Assigning flag value to end the poll at 5000 messages
                break;
            }     

Even before placing all the messages in list the loop is breaking and all the messages from records is not being inserted in the list. I have removed the break condition and it's working fine

like image 172
Sharad R. Telkar Avatar answered Feb 03 '26 15:02

Sharad R. Telkar