Kafka consumer has a configuration max.poll.records
which controls the maximum number of records returned in a single call to poll() and its default value is 500. I have set it to a very high number so that I can get all the messages in a single poll.
However, the poll returns only a few thousand messages(roughly 6000) in a single call even though the topic has many more.
How can I further increase the number of messages read by a single consumer?
max. bytes=52428800 max.
It creates any threads necessary, connects to servers, joins the group, etc. Consumer is not thread safe - you can't call its methods from different threads at the same time or else you'll get an exception.
max.poll.interval.ms : By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from poll(long) . The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll.
Before learning how to make multiple consumers read data from Kafka topics, it is necessary to understand the concept of consumers and consumer groups. Applications that need to read and process data from Kafka topics are called consumers. A consumer application basically needs to subscribe to Kafka topics to receive messages from the Kafka topics.
Unlike many traditional messaging systems, Kafka scales to a large number of consumers and consumer groups without reducing performance. In the previous example, if we add a new consumer group G2 with a single consumer, this consumer will get all the messages in topic T1 independent of what G1 is doing.
A consumer application basically needs to subscribe to Kafka topics to receive messages from the Kafka topics. A consumer group is a means for grouping several consumers into a single group. We can configure consumers either with the same consumer group or different consumer group.
If you want to read more about performance metrics for monitoring Kafka consumers, see Kafka’s Consumer Fetch Metrics. A basic consumer configuration must have a host:port bootstrap server address for connecting to a Kafka broker. It will also require deserializers to transform the message keys and values.
You can increase Consumer poll()
batch size by increasing max.partition.fetch.bytes
, but still as per documentation it has limitation with fetch.max.bytes
which also need to be increased with required batch size. And also from the documentation there is one other property message.max.bytes
in Topic config and Broker config to restrict the batch size. so one way is to increase all of these property based on your required batch size
In Consumer config
max.partition.fetch.bytes default value is 1048576
The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size
In Consumer Config
fetch.max.bytes default value is 52428800
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.
In Broker config
message.max.bytes default value is 1000012
The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large.
In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.
This can be set per topic with the topic level max.message.bytes config.
In Topic config
max.message.bytes default value is 1000012
The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large.
In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.
Most probably your payload is limited by max.partition.fetch.bytes
, which is 1MB by default. Refer to Kafka Consumer configuration.
Here's good detailed explanation:
MAX.PARTITION.FETCH.BYTES
This property controls the maximum number of bytes the server will return per partition. The default is 1 MB, which means that when KafkaConsumer.poll() returns ConsumerRecords, the record object will use at most max.partition.fetch.bytes per partition assigned to the consumer. So if a topic has 20 partitions, and you have 5 consumers, each consumer will need to have 4 MB of memory available for ConsumerRecords. In practice, you will want to allocate more memory as each consumer will need to handle more partitions if other consumers in the group fail. max. partition.fetch.bytes must be larger than the largest message a broker will accept (determined by the max.message.size property in the broker configuration), or the broker may have messages that the consumer will be unable to consume, in which case the consumer will hang trying to read them. Another important consideration when setting max.partition.fetch.bytes is the amount of time it takes the consumer to process data. As you recall, the consumer must call poll() frequently enough to avoid session timeout and subsequent rebalance. If the amount of data a single poll() returns is very large, it may take the consumer longer to process, which means it will not get to the next iteration of the poll loop in time to avoid a session timeout. If this occurs, the two options are either to lower max. partition.fetch.bytes or to increase the session timeout.
Hope it helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With