I'm using kafka stream and I'm trying to materialize a KTable into a topic.
It works but it seems to be done every 30 secs or so.
How/When does Kafka Stream decides to materialize the current state of a KTable into a topic ?
Is there any way to shorten this time and to make it more "real-time" ?
Here is the actual code I'm using
// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);
// grouping by key
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());
// same behaviour with or without the TimeWindow
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total");
// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
The topic is the most important abstraction provided by Kafka: it is a category or feed name to which data is published by producers. Every topic in Kafka is split into one or more partitions. Kafka partitions data for storing, transporting, and replicating it. Kafka Streams partitions data for processing it.
So if I understand this correctly only way to scale a Kafka Stream application is to start a new instance of the application (or increase the number of stream threads in application ), this will insure that there will be more consumers under consumerGroup('application.id'), so I can scale my stream application up to ...
Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default configuration for RocksDB, implement RocksDBConfigSetter and provide your custom class via rocksdb.
It's proven, scalable, and fault-tolerant. Kafka is particularly valuable in scenarios requiring real-time data processing and application activity tracking, as well as for monitoring purposes. It's less appropriate for data transformations on-the-fly, data storing, or when all you need is a simple task queue.
This is controlled by commit.interval.ms, which defaults to 30s. More details here: http://docs.confluent.io/current/streams/developer-guide.html
The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or cache.max.bytes.buffering (cache pressure) hits.
and here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With