Is there a way to use the same topic as the source for two different processing routines, when using Kafka Streams DSL?
StreamsBuilder streamsBuilder = new StreamsBuilder();
// use the topic as a stream
streamsBuilder.stream("topic")...
// use the same topic as a source for KTable
streamsBuilder.table("topic")...
return streamsBuilder.build();
Naive implementation from above throws a TopologyException
at runtime: Invalid topology: Topic topic has already been registered by another source. Which is totally valid, if we dive into underlying Processor API. Is using it the only way out?
UPDATE: The closest alternative I've found so far:
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<Object, Object> stream = streamsBuilder.stream("topic");
// use the topic as a stream
stream...
// create a KTable from the KStream
stream.groupByKey().reduce((oldValue, newValue) -> newValue)...
return streamsBuilder.build();
Here is the anatomy of an application that uses the Kafka Streams API. It provides a logical view of a Kafka Streams application that contains multiple stream threads, that each contain multiple stream tasks.
The topic is the most important abstraction provided by Kafka: it is a category or feed name to which data is published by producers. Every topic in Kafka is split into one or more partitions. Kafka partitions data for storing, transporting, and replicating it. Kafka Streams partitions data for processing it.
The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Reading the same topic as stream and as table is semantically questionable IMHO. Streams model immutable facts, while changelog topic that you would use to read into a KTable model updates.
If you want to use a single topic in multiple streams, you can reuse the same KStream
object multiple times (it's semantically like a broadcast):
KStream stream = ...
stream.filter();
stream.map();
Also compare: https://issues.apache.org/jira/browse/KAFKA-6687 (there are plans to remove this restriction. I doubt, we will allow to use one topic as KStream
and KTable
at the same time though—compare my comment from above).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With