Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Increase Kafka Streams Consumer Throughput

I have a Spark Streaming application and a Kafka Streams application running side by side, for benchmarking purposes. Both consume from the same input topic and write to different targets databases. Input topic has 15 partitions, both spark streaming and kafka streams have 15 consumers (1:1 ratio). In addition, event payloads are around 2kb. Not sure if it's relevant, but the 90% percentile Execution time for Spark Streaming is around 9ms. Kafka Streams, 12ms. commit() method is invoked in my Processor every time a message is processed.

The problem relies on high bursts. Spark Streaming can keep up with 700 per second, while Kafka Streams, around 60/70 per second only. I can't go beyond that. See graph below: (Green Line - Spark Streaming / Blue line - Kafka Streams)

Green Line - Spark Streaming / Blue line - Kafka Streams

As per config below, as long as it doesn't exceed 1000 events per consumer, considering the backpressure, spark streaming can keep up, regardless of the number of bytes per partition. As for Kafka Streams, if I understood its configs correctly (and please keep me honest), based on the same below, I am able to fetch a max of 1000 records (max.poll.records) every 100ms (poll.ms), as long as it doesn't exceed 1MB per partition (max.partition.fetch.bytes) and 50MB per fetch (fetch.max.bytes).

I see the same results (stuck on 70 events per second), regardless if I am using 5, 10 or 15 consumers, which drives me to think it is config related. I tried to tweak these by increasing the number of records per fetch and max bytes per partition, but i didn't get a significant result.

I am aware these are different tech and used for different purposes, but I am wondering what values I should use in Kafka Streams for better throughput.

Spark Streaming config:

spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100

Kafka Streams Config (All bytes and timing related)

# Consumer Config
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
heartbeat.interval.ms = 3000 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 1000 
request.timeout.ms = 30000
enable.auto.commit = false

# StreamsConfig
poll.ms=100 

Processor Code

public class KStreamsMessageProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String payload) {

        ResponseEntity responseEntity = null;
        try {

          // Do Some processing

        } catch (final MyException e) {

          // Do Some Exception Handling

        } finally {

            context.forward(UUID.randomUUID().toString(), responseEntity);
            context.commit();
        }
    }

Thanks in advance!

like image 484
Guilherme Alcântara Avatar asked Jun 23 '20 18:06

Guilherme Alcântara


People also ask

How do you increase consumer throughput in Kafka?

Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.

Why is consumer slow Kafka?

Kafka ConsumersIf there are way too many producers writing data to the same topic when there are a limited number of consumers then then the reading processes will always be slow. The real time objectives are lost.

How do you reduce consumer lag in Kafka?

Consuming concurrency can increase performance. If you store offsets on the zookeeper, it can be bottleneck. Reduce commits of offset and use dedicated zookeeper if possible. The best solution is storing offsets on brokers.


2 Answers

UPDATE

The database in which Kafka Streams was writing to was the big bottleneck here. After we switch it to a better cluster (better hardware, memory, cores, etc), I tuned with the config below and I was able to consume around 2k events per second. Commit interval config was also changed (as per Augusto suggestion) and also used G1GC Garbage collector.

fetch.max.bytes = 52428800
max.partition.fetch.bytes = 1048576 

fetch.max.wait.ms = 1000 
max.poll.records = 10000 
fetch.min.bytes = 100000
enable.auto.commit = false

1

like image 99
Guilherme Alcântara Avatar answered Dec 30 '22 12:12

Guilherme Alcântara


if I understood its configs correctly (and please keep me honest), based on the same below, I am able to fetch a max of 1000 records (max.poll.records) every 100ms (poll.ms), as long as it doesn't exceed 1MB per partition (max.partition.fetch.bytes) and 50MB per fetch (fetch.max.bytes).

That is not correct. :) max.poll.records specifies how many records may be returned by poll() -- if a single "fetch" to the broker returns more records, the next "poll()" call will be served from the consumer's internal buffer (ie, no network request). max.poll.records is basically a knob to tune you application code, ie, how many record do I want to process before poll() is called again. Calling poll() more frequently makes your application more reactive (for example, a rebalance only happens when poll() is called -- also you need to call poll often even to no violate max.poll.interval.ms).

poll.ms is the maximum blocking time within poll() in case no data is available. This avoids busy waiting. However, if there is data, poll() will return immediately.

Thus, the actual "network throughput" is based on the "fetch request" settings only.

like image 40
Matthias J. Sax Avatar answered Dec 30 '22 10:12

Matthias J. Sax