Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

fetch.max.wait.ms vs parameter to poll() method

Before I ask my question, I'd like to point out that a similar question has been asked here but it hasn't been answered so I am asking again. Please do not mark this as duplicate as the previously mentioned question doesn't have any answers.

I have a doubt regarding fetch.max.wait.ms and consumer.poll(<value>). This is what I've found in my research of the above mentioned configs

The poll() method takes a timeout parameter. This specifies how long it will take poll to return, with or without data

If you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will recieve a fetch request from the consumer and will respond with data either when it has 1 MB of data to return or after 100 ms, whichever happens first.

So my question is, what happens when fetch.max.wait.ms=500, consumer.poll(200) and fetch.min.bytes= 500 but the broker does not have enough data to return as set by fetch.min.bytes?

like image 570
karan mirani Avatar asked Nov 04 '19 16:11

karan mirani


1 Answers

fetch.min.bytes

This property allows a consumer to specify the minimum amount of data that it wants to receive from the broker when fetching records. If a broker receives a request for records from a consumer but the new records amount to fewer bytes than fetch.min.bytes, the broker will wait until more messages are available before sending the records back to the consumer.

fetch.max.wait.ms

It Will inform the broker to wait until it has enough data to send before responding to the consumer.

Example: If you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will receive a fetch request from the consumer and will respond with data either when it has 1 MB of data to return or after 100 ms, whichever happens first.

Above two-parameter control broker while responding message to the consumer.

poll(timeout)

Basically poll() controls how long poll() will block if data is not available in the broker to consume.

the poll is requested on the consumer side to fetch records responded by the Broker. It calls fetchrecords() and if records already available in broker with satisfying the above parameters fetch.min.bytes and fetch.max.wait.ms it will respond immediately else wait till given timeout return empty in case no records available in the broker.

It explained below pollForfetches methods in KafkaConsumer class

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
        final long startMs = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

        // if data is available already, return it immediately
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        // We do not want to be stuck blocking in the poll if we are missing some positions
        // since the offset lookup may be backing off after a failure

        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
        // updateAssignmentMetadataIfNeeded before this method.
        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
            pollTimeout = retryBackoffMs;
        }

        client.poll(pollTimeout, startMs, () -> {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.rejoinNeededOrPending()) {
            return Collections.emptyMap();
        }

        return fetcher.fetchedRecords();
    }

if fetch.min.bytes= 500 and fetch.max.wait.ms=500 which means the broker will respond to the consumer when it has 500 bytes of data to return or after 500 ms, whichever happens first. Consumer side poll will be calling every 200ms calling fetchedRecords to receive any message provided by the broker.

like image 139
Nitin Avatar answered Sep 22 '22 08:09

Nitin