I have a KTable with data that looks like this (key => value), where keys are customer IDs, and values are small JSON objects containing some customer data:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
I'd like to do some aggregations on this KTable, and basically keep a count of the number of records for each age_group
. The desired KTable data would look like this:
"18-24" => 3
"25-30" => 1
Lets say Alice
, who is in the 18-24
group above, has a birthday that puts her in the new age group. The state store backing the first KTable should now look like this:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
And I'd like the resulting aggregated KTable results to reflect this. e.g.
"18-24" => 2
"25-30" => 2
I may be overgeneralizing the issue described here:
In Kafka Streams there is no such thing as a final aggregation... Depending on your use case, manual de-duplication would be a way to resolve the issue"
But I have only been able to calculate a running total so far, e.g. Alice's birthday would be interpreted as:
"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well
Edit: here is some additional behavior that I noticed that seems unexpected.
The topology I'm using looks like:
dataKTable = builder.table("compacted-topic-1", "users-json")
.groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
.count("age-range-counts")
Now, from the initial, empty state, everything looks like this:
compacted-topic-1
(empty)
dataKTable
(empty)
// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)
// count()
age-range-counts state store
(empty)
Now, lets send a message to the compacted-topic-1
, which is streamed as a KTable above. Here is what happens:
compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4
// count()
age-range-counts state store
18-24 => 0
So I'm wondering:
groupBy
and count
in the DSL, but maybe I need to use something like reduce
? add
reducer and the subtract
reducer being called, so any clarification around any of these points will be greatly appreciated.A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT).
You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.
In the punctuate() method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.
Introducing the aggregation in Kafka and explained this in easy way to implement the Aggregation on real time streaming. In order to aggregate the stream we need do two steps operations. Group the stream — groupBy(k,v) (if Key exist in stream) or groupByKey() — Data must partitioned by key.
If you have your original KTable
containing id -> Json
data (let's call it dataKTable
) you should be able to get what you want via
KTable countKTablePerRange
= dataKTable.groupBy(/* map your age-range to be the key*/)
.count("someStoreName");
This should work for all versions of Kafka's Streams API.
Update
About the 4 values in the re-partitioning topic: that's correct. Each update to the "base KTable
" writes a record for it's "old value" and it's "new value". This is required to update the downstream KTable
correctly. The old value must be removed from one count and the new value must be added to another count. Because your (count) KTable
is potentially distributed (ie, shared over multiple parallel running app instances), both records (old and new) might end up at different instances because both might have different key and thus they must be sent as two independent records. (The record format should be more complex that you show in your question though.)
This also explains, why you need a subtractor and an adder. The subtractor removes old record from the agg result, while the adder adds new record to the agg result.
Still not sure why you don't see the correct count in the result. How many instanced to you run? Maybe try to disable KTable
cache by setting cache.max.bytes.buffering=0
in StreamsConfig
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With