I have created multi thread consumer app to work upon various partitions. Looking into various blogs i came to know about 'max.poll.records' property, in order to get control over the set of records from given topic, partition.(so it can quickly come out of Records loop and hence call cons.poll() to keep alive)
Problem is my processing logic takes time to process each record. upon starting Cons-2 both start to work on same partition as Cons-1 still did not went for re-balance (i.e. cons.poll() not happened yet).
Increasing consumers so they can re-balance them selves, cons.poll() will not occur unless all records are processed.
I may not go for 'session.timeout.ms' as starting new consumer may also start working on same partition as of Cons-1.
I have tried setting property using :
props.put("max.poll.records",1);
props.put("max.poll.records","1");
but neither changed the no. of records from poll.
I am using Apache Kafka 9 and Below API.
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.9.0.1_1</version>
</dependency>
Kafka consumer has a configuration max. poll. records which controls the maximum number of records returned in a single call to poll() and its default value is 500.
So the max. poll. records control the number of messages read at one poll. This allows us to tune the consumption based on the number of messages to be processed without timing out.
The Kafka consumer poll() method fetches records in sequential order from a specified topic/partitions. This poll() method is how Kafka clients read data from Kafka. When the poll() method is called, the consumer will fetch records from the last consumed offset.
See this answer for more details. max.poll.interval.ms default value is five minutes, so if your consumerRecords. forEach takes longer than that your consumer will be considered dead.
max.poll.records
property released in Kafka-0.10.0. It's not available in Kafka 0.9.0.1 version. See KAFKA-3007 task in the release notes.
If your processing of records took much time, the below link might be helpful.
AdvancedConsumer.java
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With