Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams API: KStream to KTable

I have a Kafka topic where I send location events (key=user_id, value=user_location). I am able to read and process it as a KStream:

KStreamBuilder builder = new KStreamBuilder();  KStream<String, Location> locations = builder         .stream("location_topic")         .map((k, v) -> {             // some processing here, omitted form clarity             Location location = new Location(lat, lon);             return new KeyValue<>(k, location);         }); 

That works well, but I'd like to have a KTable with the last known position of each user. How could I do it?

I am able to do it writing to and reading from an intermediate topic:

// write to intermediate topic locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");  // build KTable from intermediate topic KTable<String, Location> table = builder.table("location_topic_aux", "store"); 

Is there a simple way to obtain a KTable from a KStream? This is my first app using Kafka Streams, so I'm probably missing something obvious.

like image 812
Guido Avatar asked Mar 21 '17 20:03

Guido


People also ask

What is the difference between KStream and KTable in Kafka?

KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update.

What is KStream in Kafka?

KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.

What is Kafka KTable?

KTable is an abstraction of a changelog stream from a primary-keyed table. Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.

What is the output of KStream KTable join?

Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.


1 Answers

Update:

In Kafka 2.5, a new method KStream#toTable() will be added, that will provide a convenient way to transform a KStream into a KTable. For details see: https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

Original Answer:

There is not straight forward way at the moment to do this. Your approach is absolutely valid as discussed in Confluent FAQs: http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This is the simplest approach with regard to the code. However, it has the disadvantages that (a) you need to manage an additional topic and that (b) it results in additional network traffic because data is written to and re-read from Kafka.

There is one alternative, using a "dummy-reduce":

KStreamBuilder builder = new KStreamBuilder(); KStream<String, Long> stream = ...; // some computation that creates the derived KStream  KTable<String, Long> table = stream.groupByKey().reduce(     new Reducer<Long>() {         @Override         public Long apply(Long aggValue, Long newValue) {             return newValue;         }     },     "dummy-aggregation-store"); 

This approach is somewhat more complex with regard to the code compared to option 1 but has the advantage that (a) no manual topic management is required and (b) re-reading the data from Kafka is not necessary.

Overall, you need to decide by yourself, which approach you like better:

In option 2, Kafka Streams will create an internal changelog topic to back up the KTable for fault tolerance. Thus, both approaches require some additional storage in Kafka and result in additional network traffic. Overall, it’s a trade-off between slightly more complex code in option 2 versus manual topic management in option 1.

like image 116
Matthias J. Sax Avatar answered Sep 26 '22 00:09

Matthias J. Sax