We have the following problem:
We want to listen on certain Kafka topic and build it's "history" - so for specified key extract some data, add it to already existing list for that key (or create a new one if it does not exist) an put it to another topic, which has only single partition and is highly compacted. Another app can just listen on that topic and update it's history list.
I'm thinking how does it fit with Kafka streams library. We can certainly use aggregation:
msgReceived.map((key, word) -> new KeyValue<>(key, word))
.groupBy((k,v) -> k, stringSerde, stringSerde)
.aggregate(String::new,
(k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
stringSerde, "summaries2")
.to(stringSerde, stringSerde, "transaction-summary50");
which creates a local store backed by Kafka and use it as history table.
My concern is, if we decide to scale such app, each running instance will create a new backed topic ${applicationId}-${storeName}-changelog (I assume each app has different applicationId). Each instance start to consume input topic, gets a different set of keys and build a different subset of the state. If Kafka decides to rebalance, some instances will start to miss some historic states in local store as they get a completely new set of partitions to consume from.
Question is, if I just set the same applicationId for each running instance, should it eventually replay all data from the very same kafka topic that each running instance has the same local state?
Why would you create multiple apps with different ID's to perform the same job? The way Kafka achieves parallelism is through tasks:
An application’s processor topology is scaled by breaking it into multiple tasks.
More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application.
Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention.
If you need to scale your app, you can start new instances running the same app (same application ID), and some of the already assigned tasks will reassigned to the new instance. The migration of the local state stores will be automatically handled by the library:
When the re-assignment occurs, some partitions – and hence their corresponding tasks including any local state stores – will be “migrated” from the existing threads to the newly added threads. As a result, Kafka Streams has effectively rebalanced the workload among instances of the application at the granularity of Kafka topic partitions.
I recommend you to have a look to this guide.
My concern is, if we decide to scale such app, each running instance will create a new backed topic ${applicationId}-${storeName}-changelog (I assume each app has different applicationId). Each instance start to consume input topic, gets a different set of keys and build a different subset of the state. If Kafka decides to rebalance, some instances will start to miss some historic states in local store as they get a completely new set of partitions to consume from.
Some assumptions are not correct:
Thus, if all instanced do use the same application ID, all running application instance will use the same changelog topic name and thus, what you intend to do, should work out-of-the box.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With