Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams - updating aggregations on KTable

I have a KTable with data that looks like this (key => value), where keys are customer IDs, and values are small JSON objects containing some customer data:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

I'd like to do some aggregations on this KTable, and basically keep a count of the number of records for each age_group. The desired KTable data would look like this:

"18-24" => 3
"25-30" => 1

Lets say Alice, who is in the 18-24 group above, has a birthday that puts her in the new age group. The state store backing the first KTable should now look like this:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

And I'd like the resulting aggregated KTable results to reflect this. e.g.

"18-24" => 2
"25-30" => 2

I may be overgeneralizing the issue described here:

In Kafka Streams there is no such thing as a final aggregation... Depending on your use case, manual de-duplication would be a way to resolve the issue"

But I have only been able to calculate a running total so far, e.g. Alice's birthday would be interpreted as:

"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well

Edit: here is some additional behavior that I noticed that seems unexpected.

The topology I'm using looks like:

dataKTable = builder.table("compacted-topic-1", "users-json")
    .groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
    .count("age-range-counts")

1) Empty State

Now, from the initial, empty state, everything looks like this:

compacted-topic-1
(empty)


dataKTable
(empty)


// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)

// count()
age-range-counts state store
(empty)

2) Send a couple of messages

Now, lets send a message to the compacted-topic-1, which is streamed as a KTable above. Here is what happens:

compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }


// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4

// count()
age-range-counts state store
18-24 => 0

So I'm wondering:

  • Is what I'm trying to do even possible using Kafka Streams 0.10.1 or 0.10.2? I've tried using groupBy and count in the DSL, but maybe I need to use something like reduce?
  • Also, I'm having a little trouble understanding the circumstances that lead to the add reducer and the subtract reducer being called, so any clarification around any of these points will be greatly appreciated.
like image 596
foxygen Avatar asked Mar 09 '17 02:03

foxygen


People also ask

How can we create a stream for which a data record is interpreted as an update of the last value for the same record key?

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT).

How do you convert KStream to KTable?

You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.

What is punctuate in Kafka streams?

In the punctuate() method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.

How do you aggregate data in a Kafka topic?

Introducing the aggregation in Kafka and explained this in easy way to implement the Aggregation on real time streaming. In order to aggregate the stream we need do two steps operations. Group the stream — groupBy(k,v) (if Key exist in stream) or groupByKey() — Data must partitioned by key.


1 Answers

If you have your original KTable containing id -> Json data (let's call it dataKTable) you should be able to get what you want via

KTable countKTablePerRange
    = dataKTable.groupBy(/* map your age-range to be the key*/)
                .count("someStoreName");

This should work for all versions of Kafka's Streams API.

Update

About the 4 values in the re-partitioning topic: that's correct. Each update to the "base KTable" writes a record for it's "old value" and it's "new value". This is required to update the downstream KTable correctly. The old value must be removed from one count and the new value must be added to another count. Because your (count) KTable is potentially distributed (ie, shared over multiple parallel running app instances), both records (old and new) might end up at different instances because both might have different key and thus they must be sent as two independent records. (The record format should be more complex that you show in your question though.)

This also explains, why you need a subtractor and an adder. The subtractor removes old record from the agg result, while the adder adds new record to the agg result.

Still not sure why you don't see the correct count in the result. How many instanced to you run? Maybe try to disable KTable cache by setting cache.max.bytes.buffering=0 in StreamsConfig.

like image 156
Matthias J. Sax Avatar answered Oct 08 '22 05:10

Matthias J. Sax