Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to store only latest key values in a kafka topic

I have a topic that has a stream of data coming to it. What I need is to create a separate topic from this topic that only has the latest set of values given the keys.

I thought a KTable's whole purpose was that it will store the latest value given a key rather than storing the whole stream of events. However I can't seem to get this to work. Running the code below produces the keystore but that keystore (maintopiclatest) has a stream of events in it (not just the latest values). So if I send a request with 1000 records in the topic twice, rather than seeing 1000 records, I see 2000 records.

var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();

var stream = kStreamBuilder.stream("maintopic",
    Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));

var table = stream
    .groupByKey()
    .reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));

The other problem is if I want to store the KTable in a new topic I'm not sure how to do that. In order to do that it seems that I have to turn it back into a Stream so that I can call ".to" on it. But then that has the whole stream of events in it not just the latest values.

like image 866
emirhosseini Avatar asked Oct 17 '22 07:10

emirhosseini


1 Answers

This is not how a KTable works.

A KTable itself, has an internal state store and stores exactly one record per key. However, a KTable is constantly updated and subject to the so-called stream-table-duality. Each update to the KTable is sent downstream as a changelog record: https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables. Thus, each input record result in an output record.

Because it's stream processing, there is no "last key per value".

I have a topic that has a stream of data coming to it. What I need is to create a separate topic from this topic that only has the latest set of values given the keys.

At which point in time do you want a KTable to emit an update? There is no answer to this question because the input stream is conceptually infinite.

like image 137
Matthias J. Sax Avatar answered Oct 26 '22 22:10

Matthias J. Sax