Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

unable to set 'max.poll.records' under kafka consumer, where cons.poll still returns all records under partition

I have created multi thread consumer app to work upon various partitions. Looking into various blogs i came to know about 'max.poll.records' property, in order to get control over the set of records from given topic, partition.(so it can quickly come out of Records loop and hence call cons.poll() to keep alive)

Problem is my processing logic takes time to process each record. upon starting Cons-2 both start to work on same partition as Cons-1 still did not went for re-balance (i.e. cons.poll() not happened yet).

Increasing consumers so they can re-balance them selves, cons.poll() will not occur unless all records are processed.

I may not go for 'session.timeout.ms' as starting new consumer may also start working on same partition as of Cons-1.

I have tried setting property using :

props.put("max.poll.records",1);
props.put("max.poll.records","1");

but neither changed the no. of records from poll.

I am using Apache Kafka 9 and Below API.

<dependency>
    <groupId>org.apache.servicemix.bundles</groupId>
    <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
    <version>0.9.0.1_1</version>
</dependency>
like image 440
usman Avatar asked May 24 '16 12:05

usman


People also ask

What is Max poll in Kafka?

Kafka consumer has a configuration max. poll. records which controls the maximum number of records returned in a single call to poll() and its default value is 500.

What is Max Poll records?

So the max. poll. records control the number of messages read at one poll. This allows us to tune the consumption based on the number of messages to be processed without timing out.

How does Kafka consumer poll work?

The Kafka consumer poll() method fetches records in sequential order from a specified topic/partitions. This poll() method is how Kafka clients read data from Kafka. When the poll() method is called, the consumer will fetch records from the last consumed offset.

What is the default poll interval of Kafka consumer?

See this answer for more details. max.poll.interval.ms default value is five minutes, so if your consumerRecords. forEach takes longer than that your consumer will be considered dead.


1 Answers

max.poll.records property released in Kafka-0.10.0. It's not available in Kafka 0.9.0.1 version. See KAFKA-3007 task in the release notes.

If your processing of records took much time, the below link might be helpful.

AdvancedConsumer.java

like image 134
Kamal Chandraprakash Avatar answered Sep 28 '22 10:09

Kamal Chandraprakash