we are currently implementing a process (using the Kafka Processor API) were we need to combine information from 2 correlated events (messages) on a topic and then forward those combined information. The events originate from IoT devices and since we want to keep them in order, the source topic uses a device identifier as key. The events also contain a correlation ID:
Key
{ deviceId: "..." }
Message
{ deviceId: "...", correlationId: "...", data: ...}
Our first approach was to create a Processor that has a connected State Store, which stores every incoming message, using the correlation ID as key. That enables us to query the store for the correlation ID of an incoming message, and if there already is a message with the same ID in the store, we can combine the information, forward a new event and remove the entry from the store. So for every correlation ID the following happens: at some point the first message with that ID is consumed and stored and at some other point in the time the second message with that ID results in the store entry being removed.
State Key
{ correlationId: "..." }
State Value
{ event: { deviceId: "...", correlationId: "...", data: ... }}
But now we are wondering how Kafka Streams is handling the different keys. We are using a Microservice approach and there will be multiple instances of that service running. The store is automatically backed by an internal topic. Consider re-scaling the service instances, s.t. the partitions of the source topic and the state topic are re-balanced. Is is possible that the partition for a specific correlation ID is assigned to another service than the partition for the corresponding device ID? Could we end up in a situation were the second event with the same correlation ID would be consumed by a service instance, that does not have access to the already stored first event?
Thanks in advance!
The Admin API for inspecting and managing Kafka objects like topics and brokers. The Producer API for writing (publishing) to topics. The Consumer API for reading (subscribing to) topics. The Kafka Streams API to provide access for applications and microservices to higher-level stream processing functions.
One of Kafka's four core APIs is the Streams API. By using the Streams API, we can easily develop a streaming processor, which is an application that takes continual streams of data from input topics, performs some processing on this data, and produces continual streams of data to output topics.
Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forward them to its down-stream processors.
Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.
If I understand your setup correctly, then yes, the approach is correct and (re)scaling will just work.
TL;DR: If a stream task is moved from machine A to machine B, then all its state will be moved as well, regardless of how that state is keyed (in your case it happens to be keyed by correlationId).
In more detail:
deviceId). This ensures that, even when stream tasks are being moved across machines/VMs/containers, they will always see "their" input partitions = their input data.correlationId). What's important for your question is that it does not matter how the state is keyed.  It's only important how the input partitions are keyed, because that determines which data flows from the input topic to a specific stream task (see previous bullet point). When a stream task is being moved across machines/VM/containers, all its state will be moved as well so that it always has "its own" state available.The store is automatically backed by an internal topic.
As you already suggested, the fact that a store has an internal topic (for fault-tolerance and for elastic scaling, because that internal topic is used to reconstruct a state store when its stream task was moved from A to B) is an implementation detail. As a developer using the Kafka Streams API, the handling of state store recovery is automagically and transparently done for you.
When a stream task is being moved, and thus its state store(s), then Kafka Streams knows how it needs to reconstruct the state store(s) at the new location of the stream task. You don't need to worry about that.
Is is possible that the partition for a specific correlation ID is assigned to another service than the partition for the corresponding device ID?
No (which is good). A stream task will always know how to reconstruct its state (1+ state stores), regardless of how that state itself is keyed.
Could we end up in a situation were the second event with the same correlation ID would be consumed by a service instance, that does not have access to the already stored first event?
No (which is good).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With