Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka set the maximum number of messages to read from the topic

I am new to Apache Kafka and exploring the SimpleConsumer to read messages from the topic.

I use the following piece of code to do the same,

FetchRequestBuilder builder = new FetchRequestBuilder();
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, 1024).build();
FetchResponse fetchResponse;
try {
     fetchResponse = consumer.fetch(fetchRequest);
 } catch (Exception e) {}

This reads all the available messages in the specific partition; I would like to set the maximum number of messages to be read. Is there a way to do this at this stage? When there are larger number of messages in the queue, i don't want all of them landing in the JVM heap.

Another question,

The following code return a ByteBufferMessageSet.

fetchResponse.messageSet(topic, partitionId);

Does this mean, not all available messages actually land in memory?

like image 513
Manikandan Kannan Avatar asked Feb 17 '15 00:02

Manikandan Kannan


People also ask

What is the maximum size of a message in Kafka?

An optional configuration property, “ message.max.bytes “, can be used to allow all topics on a Broker to accept messages of greater than 1MB in size. And this holds the value of the largest record batch size allowed by Kafka after compression (if compression is enabled).

How to count number of messages in a Kafka topic partition?

If you are using newer version of Kafka , you can try the below option of kafka.tools.GetOffsetShell as well. This command will display the number of messages in each Topic Partitions. You can also use Kafkacat to count the no. of messages or events. Let’s say you have created a schema of Kafka Topic “Test1” in the KsqlDB.

How to read messages from a Kafka topic and write them?

Suppose you have an application that needs to read messages from a Kafka topic, run some validations against them, and write the results to another data store. In this case your application will create a consumer object, subscribe to the appropriate topic, and start receiving messages, validating them and writing the results.

How to scale data consumption from a Kafka topic?

The main way we scale data consumption from a Kafka topic is by adding more consumers to a consumer group. It is common for Kafka consumers to do high-latency operations such as write to a database or a time-consuming computation on the data.


2 Answers

While you can't limit the number of messages, you can limit the number of bytes received per topic-partition per request. However, this should be done as a configuration setting rather than as part of your consumer implementation code. The Kafka consumer config docs say that you can specify a maximum number of bytes read as socket.receive.buffer.bytes. This should allow you to have more fine-grained control over exactly how much space Kafka messages take up in the JVM heap. Please note that this value must be equal to or greater than the maximum message size on the broker, or else the producer could send messages that are too large to ever be consumed.

like image 55
Grayson Avatar answered Sep 30 '22 13:09

Grayson


max.poll.records

The maximum number of records returned in a single call to poll(). https://kafka.apache.org/documentation/#consumerconfigs

like image 20
Denis Stafichuk Avatar answered Sep 30 '22 11:09

Denis Stafichuk