Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka Streams Materializing KTables to a topic seems slow

I'm using kafka stream and I'm trying to materialize a KTable into a topic.

It works but it seems to be done every 30 secs or so.

How/When does Kafka Stream decides to materialize the current state of a KTable into a topic ?

Is there any way to shorten this time and to make it more "real-time" ?

Here is the actual code I'm using

// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);

// grouping by key
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());

// same behaviour with or without the TimeWindow
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total");

// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
like image 835
thomas.g Avatar asked Jun 23 '17 00:06

thomas.g


People also ask

What is difference between a Kafka topic and Kafka stream?

The topic is the most important abstraction provided by Kafka: it is a category or feed name to which data is published by producers. Every topic in Kafka is split into one or more partitions. Kafka partitions data for storing, transporting, and replicating it. Kafka Streams partitions data for processing it.

How do I scale a Kafka Streams application?

So if I understand this correctly only way to scale a Kafka Stream application is to start a new instance of the application (or increase the number of stream threads in application ), this will insure that there will be more consumers under consumerGroup('application.id'), so I can scale my stream application up to ...

Does Kafka Streams use RocksDB?

Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default configuration for RocksDB, implement RocksDBConfigSetter and provide your custom class via rocksdb.

Is Kafka good for real-time?

It's proven, scalable, and fault-tolerant. Kafka is particularly valuable in scenarios requiring real-time data processing and application activity tracking, as well as for monitoring purposes. It's less appropriate for data transformations on-the-fly, data storing, or when all you need is a simple task queue.


1 Answers

This is controlled by commit.interval.ms, which defaults to 30s. More details here: http://docs.confluent.io/current/streams/developer-guide.html

The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or cache.max.bytes.buffering (cache pressure) hits.

and here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams

like image 137
Michal Borowiecki Avatar answered Oct 17 '22 18:10

Michal Borowiecki