Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Kafka Consumer Consume from Multiple assigned Partition

tl;dr; I am trying to understand how a single consumer that is assigned multiple partitions handles consuming records for reach partition.

For example:

  • Completely processes a single partition before moving to the next.
  • Process a chunk of available records from each partition every time.
  • Process a batch of N records from first available partitions
  • Process a batch of N records from partitions in round-robin rotation

I found the partition.assignment.strategy configuration for Ranged or RoundRobin Assignors but this only determines how consumers are assigned partitions not how it consumes from the partitions it is assigned to.

I started digging into the KafkaConsumer source and #poll() lead me to the #pollForFetches() #pollForFetches() then lead me to fetcher#fetchedRecords() and fetcher#sendFetches()

This just lead me to try to follow along the entire Fetcher class all together and maybe it is just late or maybe I just didn't dig in far enought but I am having trouble untangling exactly how a consumer will process multiple assigned partitions.

Background

Working on a data pipeline backed by Kafka Streams.

At several stages in this pipeline as records are processed by different Kafka Streams applications the stream is joined to compacted topics feed by external data sources that provide the required data that will be augmented in the records before continuing to the next stage in processing.

Along the way there are several dead letter topics where the records could not be matched to external data sources that would have augmented the record. This could be because the data is just not available yet (Event or Campaign is not Live yet) or it it is bad data and will never match.

The goal is to republish records from the dead letter topic when ever new augmented data is published so that we can match previously unmatched records from the dead letter topic in order to update them and send them down stream for additional processing.

Records have potentially failed to match on several attempts and could have multiple copies in the dead letter topic so we only want to reprocess existing records (before latest offset at the time the application starts) as well as records that were sent to the dead letter topic since the last time the application ran (after the previously saved consumer group offsets).

It works well as my consumer filters out any records arriving after the application has started, and my producer is managing my consumer group offsets by committing the offsets as part of the publishing transaction.

But I want to make sure that I will eventually consume from all partitions as I have ran into an odd edge case where unmatached records get reprocessed and land in the same partition as before in the dead letter topic only to get filtered out by the consumer. And though it is not getting new batches of records to process there are partitions that have not been reprocessed yet either.

Any help understanding how a single consumer processes multiple assigned partitions would be greatly appreciated.

like image 886
DVS Avatar asked Feb 07 '19 05:02

DVS


1 Answers

You were on the right tracks looking at Fetcher as most of the logic is there.

First as the Consumer Javadoc mentions:

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption.

As you can imagine, in practice, there are a few things to take into account.

  • Each time the consumer is trying to fetch new records, it will exclude partitions for which it already has records awaiting (from a previous fetch). Partitions that already have a fetch request in-flight are also excluded.

  • When fetching records, the consumer specifies fetch.max.bytes and max.partition.fetch.bytes in the fetch request. These are used by the brokers to respectively determine how much data to return in total and per partition. This is equally applied to all partitions.

Using these 2 approaches, by default, the Consumer tries to consume from all partitions fairly. If that's not the case, changing fetch.max.bytes or max.partition.fetch.bytes usually helps.

In case, you want to prioritize some partitions over others, you need to use pause() and resume() to manually control the consumption flow.

like image 149
Mickael Maison Avatar answered Nov 16 '22 00:11

Mickael Maison