Kafka Streams engine maps a partition to exactly one worker (i.e. Java App), so that all messages in that partition are processed by that worker. I have the following scenario, and am trying to understand if it is still feasible for it to work.
I have a Topic A (with 3 partitions). The messages sent to it are partitioned randomly by Kafka (i.e. there is no key). The message I send to it has a schema like below
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
Since I have 3 partitions, and the messages are partitioned randomly across them, cars of the same model could be written to different partitions. For example
P1
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Toyota", color: "Blue", timeStampEpoch: 14334343342}
P2
{carModel: "Toyota", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Blue", timeStampEpoch: 14334343342}
P3
{carModel: "Nissan", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Blue", timeStampEpoch: 14334343342}
Now let's say I wanted to count the total number of cars seen by carModel. I write a Kafka Streams application that listens to topic A, maps messages by carModel, i.e.
carStream.map((key, value) -> KeyValue.pair(value["carModel"], value))
and writes the total to another topic B, a message of the form
{carModel: "Nissan", totalCount: 5}
I then launch 3 instances of it, all part of the same Consumer Group. Kafka would then efficiently map each partition to one of the workers. Example
P1 --> Worker A
P2 --> Worker B
P3 --> Worker C
However, since each Worker only sees 1 partition then it will only see partial information for each car model. It will miss data for the same car model from other partitions.
Question: Is my understanding correct?
If it is, I can imagine that I could re-partition (i.e. reshuffle) my data by carModel for this use case to work.
But I just want to make sure I'm not misunderstanding how this works, and in fact Kafka does somehow magically take care of the re-partitioning after my internal mapping in my application.
Kafka Streams will do the repartitioning of your data automatically. Your program will be something like:
stream.map(...).groupByKey().count();
For this pattern, Kafka Streams detects that you set a new key in map
and thus will create a topic automatically in the background to repartition the data for the groupByKey().count()
step (as of v0.10.1 via KAFKA-3561).
Note,
map()
"marks" the stream that it requires repartitioning and.groupByKey().count()
will create the topic for repartitioning. With this regard, repartitioning is "lazy", i.e., it is only done if required. If there is no.groupByKey().count()
there would be no repartitioning introduced.
Basically, the program from above is executed in the same way as
stream.map(...).through("some-topic").groupByKey().count();
Kafka Streams automatically "insert" the through()
step and thus computes the correct result.
If you are using Kafka Streams 0.10.0, you will need to create the repartition topic manually with the desired number of partitions and you will need to add the call to
through()
to your code, too.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With