Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Processor API: Different key for Source and StateStore?

we are currently implementing a process (using the Kafka Processor API) were we need to combine information from 2 correlated events (messages) on a topic and then forward those combined information. The events originate from IoT devices and since we want to keep them in order, the source topic uses a device identifier as key. The events also contain a correlation ID:

Key

{ deviceId: "..." }

Message

{ deviceId: "...", correlationId: "...", data: ...}

Our first approach was to create a Processor that has a connected State Store, which stores every incoming message, using the correlation ID as key. That enables us to query the store for the correlation ID of an incoming message, and if there already is a message with the same ID in the store, we can combine the information, forward a new event and remove the entry from the store. So for every correlation ID the following happens: at some point the first message with that ID is consumed and stored and at some other point in the time the second message with that ID results in the store entry being removed.

State Key

{ correlationId: "..." }

State Value

{ event: { deviceId: "...", correlationId: "...", data: ... }}

But now we are wondering how Kafka Streams is handling the different keys. We are using a Microservice approach and there will be multiple instances of that service running. The store is automatically backed by an internal topic. Consider re-scaling the service instances, s.t. the partitions of the source topic and the state topic are re-balanced. Is is possible that the partition for a specific correlation ID is assigned to another service than the partition for the corresponding device ID? Could we end up in a situation were the second event with the same correlation ID would be consumed by a service instance, that does not have access to the already stored first event?

Thanks in advance!

like image 621
raptor206 Avatar asked Apr 18 '18 16:04

raptor206


People also ask

What are the 4 major Kafka APIs?

The Admin API for inspecting and managing Kafka objects like topics and brokers. The Producer API for writing (publishing) to topics. The Consumer API for reading (subscribing to) topics. The Kafka Streams API to provide access for applications and microservices to higher-level stream processing functions.

Which API can be used to stream messages from a Kafka topic for processing?

One of Kafka's four core APIs is the Streams API. By using the Streams API, we can easily develop a streaming processor, which is an application that takes continual streams of data from input topics, performs some processing on this data, and produces continual streams of data to output topics.

Which processor consumes records from one or more Kafka?

Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forward them to its down-stream processors.

Which processor sends any received records from its upstream processors to a specified Kafka topic?

Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.


1 Answers

If I understand your setup correctly, then yes, the approach is correct and (re)scaling will just work.

TL;DR: If a stream task is moved from machine A to machine B, then all its state will be moved as well, regardless of how that state is keyed (in your case it happens to be keyed by correlationId).

In more detail:

  • Kafka Streams assigns processing work to stream tasks. This happens by mapping input partitions to stream tasks in a deterministic manner, based on the message key in the input partitions (in your case: keyed by deviceId). This ensures that, even when stream tasks are being moved across machines/VMs/containers, they will always see "their" input partitions = their input data.
  • A stream tasks consists, essentially, of the actual processing logic (in your case: the Processor API code) and any associated state (in your case: you have one state store that is keyed by correlationId). What's important for your question is that it does not matter how the state is keyed. It's only important how the input partitions are keyed, because that determines which data flows from the input topic to a specific stream task (see previous bullet point). When a stream task is being moved across machines/VM/containers, all its state will be moved as well so that it always has "its own" state available.

The store is automatically backed by an internal topic.

As you already suggested, the fact that a store has an internal topic (for fault-tolerance and for elastic scaling, because that internal topic is used to reconstruct a state store when its stream task was moved from A to B) is an implementation detail. As a developer using the Kafka Streams API, the handling of state store recovery is automagically and transparently done for you.

When a stream task is being moved, and thus its state store(s), then Kafka Streams knows how it needs to reconstruct the state store(s) at the new location of the stream task. You don't need to worry about that.

Is is possible that the partition for a specific correlation ID is assigned to another service than the partition for the corresponding device ID?

No (which is good). A stream task will always know how to reconstruct its state (1+ state stores), regardless of how that state itself is keyed.

Could we end up in a situation were the second event with the same correlation ID would be consumed by a service instance, that does not have access to the already stored first event?

No (which is good).

like image 95
Michael G. Noll Avatar answered Sep 19 '22 03:09

Michael G. Noll