I have a Spark Streaming application and a Kafka Streams application running side by side, for benchmarking purposes. Both consume from the same input topic and write to different targets databases. Input topic has 15 partitions, both spark streaming and kafka streams have 15 consumers (1:1 ratio). In addition, event payloads are around 2kb. Not sure if it's relevant, but the 90% percentile Execution time for Spark Streaming is around 9ms. Kafka Streams, 12ms. commit() method is invoked in my Processor every time a message is processed.
The problem relies on high bursts. Spark Streaming can keep up with 700 per second, while Kafka Streams, around 60/70 per second only. I can't go beyond that. See graph below: (Green Line - Spark Streaming / Blue line - Kafka Streams)
As per config below, as long as it doesn't exceed 1000 events per consumer, considering the backpressure, spark streaming can keep up, regardless of the number of bytes per partition. As for Kafka Streams, if I understood its configs correctly (and please keep me honest), based on the same below, I am able to fetch a max of 1000 records (max.poll.records) every 100ms (poll.ms), as long as it doesn't exceed 1MB per partition (max.partition.fetch.bytes) and 50MB per fetch (fetch.max.bytes).
I see the same results (stuck on 70 events per second), regardless if I am using 5, 10 or 15 consumers, which drives me to think it is config related. I tried to tweak these by increasing the number of records per fetch and max bytes per partition, but i didn't get a significant result.
I am aware these are different tech and used for different purposes, but I am wondering what values I should use in Kafka Streams for better throughput.
Spark Streaming config:
spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100
Kafka Streams Config (All bytes and timing related)
# Consumer Config
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
request.timeout.ms = 30000
enable.auto.commit = false
# StreamsConfig
poll.ms=100
Processor Code
public class KStreamsMessageProcessor extends AbstractProcessor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String payload) {
ResponseEntity responseEntity = null;
try {
// Do Some processing
} catch (final MyException e) {
// Do Some Exception Handling
} finally {
context.forward(UUID.randomUUID().toString(), responseEntity);
context.commit();
}
}
Thanks in advance!
Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.
Kafka ConsumersIf there are way too many producers writing data to the same topic when there are a limited number of consumers then then the reading processes will always be slow. The real time objectives are lost.
Consuming concurrency can increase performance. If you store offsets on the zookeeper, it can be bottleneck. Reduce commits of offset and use dedicated zookeeper if possible. The best solution is storing offsets on brokers.
UPDATE
The database in which Kafka Streams was writing to was the big bottleneck here. After we switch it to a better cluster (better hardware, memory, cores, etc), I tuned with the config below and I was able to consume around 2k events per second. Commit interval config was also changed (as per Augusto suggestion) and also used G1GC Garbage collector.
fetch.max.bytes = 52428800
max.partition.fetch.bytes = 1048576
fetch.max.wait.ms = 1000
max.poll.records = 10000
fetch.min.bytes = 100000
enable.auto.commit = false
if I understood its configs correctly (and please keep me honest), based on the same below, I am able to fetch a max of 1000 records (max.poll.records) every 100ms (poll.ms), as long as it doesn't exceed 1MB per partition (max.partition.fetch.bytes) and 50MB per fetch (fetch.max.bytes).
That is not correct. :) max.poll.records
specifies how many records may be returned by poll()
-- if a single "fetch" to the broker returns more records, the next "poll()" call will be served from the consumer's internal buffer (ie, no network request). max.poll.records
is basically a knob to tune you application code, ie, how many record do I want to process before poll()
is called again. Calling poll()
more frequently makes your application more reactive (for example, a rebalance only happens when poll()
is called -- also you need to call poll often even to no violate max.poll.interval.ms
).
poll.ms
is the maximum blocking time within poll()
in case no data is available. This avoids busy waiting. However, if there is data, poll()
will return immediately.
Thus, the actual "network throughput" is based on the "fetch request" settings only.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With