Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams – best way to get KTable and KStream on same topic?

I have an issue with Kafka Streams (0.10.1.1). I'm trying to create a KStreamand a KTableon the same topic.

The first approach I tried was simple calling the KStreamBuilder methods for stream and table on the same topic. This resulted in

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.

OK, this seems to be some restriction built-into Kafka Streams.

My second approach was initially creating a KTableand using the toStream() method on it. This has the issue that KTablesdo some internal buffering/flushing, so the output stream does not reflect all input elements if a key is occurring multiple times as in my example. This is a problem as I'm counting the occurrences of a key.

The approach that seems to work is to initially create a KStream, group it by key and then "reduce" it by discarding the old aggregate and just keeping the new value. I'm not too happy with this approach as a) it seems very complicated and b) the Reducer interface does not specify which one is the already aggregated value and which one is the new one. I went with convention and kept the second one, but ... meh.

So the question boils down to: is there a better way? Am I missing something blindingly obvious?

Please keep in mind that I'm not working on a proper use case – this is just me getting to know the Streams-API.

like image 428
ftr Avatar asked Feb 17 '17 19:02

ftr


People also ask

How do I convert KTable to KStream?

Proposed Changes A KTable can be converted to a KStream using the given KeyValueWithPreviousMapper to compute the new value of the output record.

What is the difference between KStream and KTable?

KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update. There is another abstraction for not partitioned tables.

What is the output of KStream KTable join?

So the expected output would be 8000 records.


1 Answers

About adding a topic twice: that's not possible, because a Kafka Streams application is a single "consumer group" and thus can only commit offsets for a topic a single time, while adding a topic twice would indicate that the topic get's consumer twice (and independent progress).

For the approach KTable#toStream(), you can disable caching via StreamsConfig parameter cache.max.bytes.buffering == 0. However, this is a global setting and disables caching/deduplication for all KTables (cf. http://docs.confluent.io/current/streams/developer-guide.html#memory-management).

Update: Since Kafka 0.11 it's possible to disable caching for each KTable individually via Materialized parameter.

The groupBy approach works also, even if it requires some boilerplate. We considering adding KStream#toTable() to the API to simplify this transformation. And yes, second argument in reduce is the new value -- as reduce is for combining two values, the API has no concept of "old" and "new" and thus the parameters do not have such naming.

like image 64
Matthias J. Sax Avatar answered Oct 03 '22 06:10

Matthias J. Sax