I would like to fully understand the rules that kafka-streams processors must obey with respect to partitioning of a processor's input and its state(s). Specifically I would like to understand:
I have been doing some research on this and the answers I found seem not to be very clear and sometimes contradictory: e.g. this one seems to suggest that the stores are totally independent and you can use any key while this one says that you should never use a store with a different key than the one in the input topic.
Thanks for any clarification.
State. Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations.
Kafka's topics are divided into several partitions. While the topic is a logical concept in Kafka, a partition is the smallest storage unit that holds a subset of records owned by a topic . Each partition is a single log file where records are written to it in an append-only fashion.
Kafka PartitioningPartitioning takes the single topic log and breaks it into multiple logs, each of which can live on a separate node in the Kafka cluster. This way, the work of storing messages, writing new messages, and processing existing messages can be split among many nodes in the cluster.
Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
You have to distinguish between input partitions and store shards/changelog topic partitions for a complete picture. Also, it depends if you use the DSL or the Processor API, because the DSL does some auto-repartitioning but the Processor API doesn't. Because the DSL compiles down to the Processor API, I'll start with this.
If you have a topic with let's say 4 partitions and you create a stateful processor that consumes this topic, you will get 4 tasks, each task running a processor instance that maintains one shard of the store. Note, that the overall state is split into 4 shards and each shard is basically isolated from the other shards.
From an Processor API runtime point of view, the input topic partitions and the state store shards (including their corresponding changelog topic partitions) are a unit of parallelism. Hence, the changelog topic for the store is create with 4 partitions, and changelog-topic-partition-X is mapped to input-topic-partition-X. Note, that Kafka Streams does not use hash-based partitioning when writing into a changelog topic, but provides the partition number explicitly, to ensure that "processor instance X", that processes input-topic-partition-X, only reads/write from/into changelog-topic-partition-X.
Thus, the runtime is agnostic to keys if you wish.
If your input topic is not partitioned by keys, messages with the same key will be processed by different task. Depending on the program, this might be ok (eg. filtering), or not (eg, count per key).
Similar to state: you can put any key into a state store, but this key is "local" to the corresponding shard. Other tasks, will never see this key. Thus, if you use the same key in a store on different tasks, they will be completely independent from each other (as if they would be two keys).
Using Processor API, it's your responsibility to partition input data correctly and to use stores correctly, depending on the operator semantics you need.
At DSL level, Kafka Streams will make sure that data is partitioned correctly to ensure correct operator semantics. First, it's assumed that input topics are partitioned by key. If the key is modified, for example via selectKey()
and a downstream operator is an aggregation, Kafka Streams is repartitioning the data first, to insure that records with the same key are in the same topic partition. This ensures, that each key will be used in a single store shard. Thus, the DSL will always partition the data such that one key is never processed on different shards.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With