I am new to Apache Kafka and exploring the SimpleConsumer to read messages from the topic.
I use the following piece of code to do the same,
FetchRequestBuilder builder = new FetchRequestBuilder();
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, 1024).build();
FetchResponse fetchResponse;
try {
fetchResponse = consumer.fetch(fetchRequest);
} catch (Exception e) {}
This reads all the available messages in the specific partition; I would like to set the maximum number of messages to be read. Is there a way to do this at this stage? When there are larger number of messages in the queue, i don't want all of them landing in the JVM heap.
Another question,
The following code return a ByteBufferMessageSet.
fetchResponse.messageSet(topic, partitionId);
Does this mean, not all available messages actually land in memory?
An optional configuration property, “ message.max.bytes “, can be used to allow all topics on a Broker to accept messages of greater than 1MB in size. And this holds the value of the largest record batch size allowed by Kafka after compression (if compression is enabled).
If you are using newer version of Kafka , you can try the below option of kafka.tools.GetOffsetShell as well. This command will display the number of messages in each Topic Partitions. You can also use Kafkacat to count the no. of messages or events. Let’s say you have created a schema of Kafka Topic “Test1” in the KsqlDB.
Suppose you have an application that needs to read messages from a Kafka topic, run some validations against them, and write the results to another data store. In this case your application will create a consumer object, subscribe to the appropriate topic, and start receiving messages, validating them and writing the results.
The main way we scale data consumption from a Kafka topic is by adding more consumers to a consumer group. It is common for Kafka consumers to do high-latency operations such as write to a database or a time-consuming computation on the data.
While you can't limit the number of messages, you can limit the number of bytes received per topic-partition per request. However, this should be done as a configuration setting rather than as part of your consumer implementation code. The Kafka consumer config docs say that you can specify a maximum number of bytes read as socket.receive.buffer.bytes
. This should allow you to have more fine-grained control over exactly how much space Kafka messages take up in the JVM heap. Please note that this value must be equal to or greater than the maximum message size on the broker, or else the producer could send messages that are too large to ever be consumed.
max.poll.records
The maximum number of records returned in a single call to poll(). https://kafka.apache.org/documentation/#consumerconfigs
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With