I have an issue with Kafka Streams (0.10.1.1). I'm trying to create a KStream
and a KTable
on the same topic.
The first approach I tried was simple calling the KStreamBuilder
methods for stream and table on the same topic. This resulted in
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.
OK, this seems to be some restriction built-into Kafka Streams.
My second approach was initially creating a KTable
and using the toStream()
method on it. This has the issue that KTables
do some internal buffering/flushing, so the output stream does not reflect all input elements if a key is occurring multiple times as in my example. This is a problem as I'm counting the occurrences of a key.
The approach that seems to work is to initially create a KStream
, group it by key and then "reduce" it by discarding the old aggregate and just keeping the new value. I'm not too happy with this approach as a) it seems very complicated and b) the Reducer
interface does not specify which one is the already aggregated value and which one is the new one. I went with convention and kept the second one, but ... meh.
So the question boils down to: is there a better way? Am I missing something blindingly obvious?
Please keep in mind that I'm not working on a proper use case – this is just me getting to know the Streams-API.
Proposed Changes A KTable can be converted to a KStream using the given KeyValueWithPreviousMapper to compute the new value of the output record.
KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update. There is another abstraction for not partitioned tables.
So the expected output would be 8000 records.
About adding a topic twice: that's not possible, because a Kafka Streams application is a single "consumer group" and thus can only commit offsets for a topic a single time, while adding a topic twice would indicate that the topic get's consumer twice (and independent progress).
For the approach KTable#toStream()
, you can disable caching via StreamsConfig
parameter cache.max.bytes.buffering == 0
. However, this is a global setting and disables caching/deduplication for all KTable
s (cf. http://docs.confluent.io/current/streams/developer-guide.html#memory-management).
Update: Since Kafka 0.11 it's possible to disable caching for each KTable individually via
Materialized
parameter.
The groupBy
approach works also, even if it requires some boilerplate. We considering adding KStream#toTable()
to the API to simplify this transformation. And yes, second argument in reduce
is the new value -- as reduce is for combining two values, the API has no concept of "old" and "new" and thus the parameters do not have such naming.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With