Is there functionality built into Kafka Streams that allows for dynamically connecting a single input stream into multiple output streams? KStream.branch
allows branching based on true/false predicates, but this isn't quite what I want. I'd like each incoming log to determine the topic it will be streamed to at runtime, e.g., a log {"date": "2017-01-01"}
will be streamed to the topic topic-2017-01-01
and a log {"date": "2017-01-02"}
will be streamed to the topic topic-2017-01-02
.
I could call forEach
on the stream, then write to a Kafka producer, but that doesn't seem very elegant. Is there a better way to do this within the Streams framework?
Update Streams This means that when using a KTable, keys are required, although they aren't required when using a KStream. By overwriting records, a KTable creates a completely different data structure from a KStream, even given the same source records.
To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.
Kafka Streams is an API for writing client applications that transform data in Apache Kafka. You usually do this by publishing the transformed data onto a new topic. The data processing itself happens within your client application, not on a Kafka broker. Kafka Connect is an API for moving data into and out of Kafka.
Only the Kafka Streams DSL has the notion of a KTable . A KTable is an abstraction of a changelog stream, where each data record represents an update.
If you want to create topics dynamically based on your data, you do not get any support within Kafka's Streaming API at the moment (v0.10.2
and earlier). You will need to create a KafkaProducer
and implement your dynamic "routing" by yourself (for example using KStream#foreach()
or KStream#process()
). Note, that you need to do synchronous writes to avoid data loss (which are not very performant unfortunately). There are plans to extend Streaming API with dynamic topic routing, but there is no concrete timeline for this feature right now.
There is one more consideration you should take into account. If you do not know your destination topic(s) ahead of time and just rely on the so-called "topic auto creation" feature, you should make sure that those topics are being created with the desired configuration settings (e.g., number of partitions or replication factor).
As an alternative to "topic auto creation" you can also use Admin Client (available since v0.10.1
) to create topics with correct configuration. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With