We have the following high level DSL processing topology:
TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);
KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");
KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");
KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);
KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")
In short, what we're doing above is:
The idea is to create windowed event counts and use these windowed keys for the join and aggregate operations (which in the KTable case don't have windows on such operations)
The problem is this: The state stores of the join and aggregate operations have no retention mechanism and cause a space explosion in the disk (RocksDB).
More specifically: The (hopping) windows cause a cartesian product on the keys and there's no mechanism to delete old windows.
The same problem will also occur if the KTable key were not windowed, but just a large enough amount of unique keys
Note that the state stores backing table1 and table2 have no space issue, this is because they are provided a windowed store by the DSL which manages dropping old windows. In the join and aggregate we treat the windowed keys as "any old key" and the DSL does the same and uses a non-windowed KeyValueStore.
This question is related to the following: KAFKA-4212, KAFKA-4273, confluent forum question
Are there any misunderstood concepts here? Is there a simple way to implement this topology using the DSL? If not, what's the suggested ways to implement it using the low level API?
I think you could do something like this:
StreamsBuilder builder = new StreamBuilder();
KStream<K,V> streams = builder.stream(/* pattern for both streams */);
KStream<SmallerKey,Tuple<Long,V,String>> enrichedStream = stream.transform(
/* custom Transformer that set the weaker grouping key right here
and puts the extracted component into the value before the aggregation;
additionally (that's why we need a Transformer) get the topic name from
context object and enrich the value accordingly (ie, third String argument in the output Tuple */);
KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> = stream.groupByKey.aggregate(
timeWindow,
/* initializer: return an empty Map;
aggregator:
for each input record, check if Map contains entry for Long key already (ie, extracted component, first argument from input Tuple<Long,V,String>;
if not, add new map entry with Pair(0,0)
take the corresponding Pair from the Map and increase one
counter depending on the original topic that
is encoded in the input value (ie, Pair.first is counter for first topic and Pair.second is counter for second topic) */);
Example:
Assume two input stream s1
and s2
with the following records (<TS,key,value>
):
s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>
In your original program, you would first count both streams individually (assuming tumbling window of size 5) getting (omitting TS):
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k1>, 2> | <W1<k2>, 1> | <W2<k2>, 1>
and
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k2>, 2> | <W2<k2>, 1>
After your left-join you get (result after all records got processed, omitting intermediates):
<<W0<k1>, <2,1>> | <W0<k2>, <1,2>> | <W1<k2>, <1,null>> | <W2<k2>, <1,1>>
Now you re-group using a "weaker key", extract a key part into the value, and put all entries into a map, bases on the extracted key part. Let's assume that we split our key based on "char" and "number" (ie, k1
is split into k
as smallerKey
and 1
is the extracted Long
that goes into the value). After the aggregation you get (I denote the map as (k1 -> v1, k2 - v2)
:
<<W0<k>, (1 -> <2,1>, 2 -> <1,2>> | <W1<k>, (2 -> <1,null>)> | <W2<k>, (2 -> <1,1>)>
If this is a correct example (I might have miss understood your problem description). You can do the same using transform/groupBy/aggregate as describe above. The input was:
s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>
The result of transform
is (including TS
):
<1, k, <1,v1,s1>> | <2, k, <2,v2,s1>> | <3, k, <1,v3,s1>> | <6, k, <2,v4,s1>> | <12, k, <2,v5,s1>>
and
<1, k, <1,va,s2>> | <2, k, <2,vb,s2>> | <3, k, <2,vc,s2>> | <11, k, <2,vd,s2>>
Note, that
Transform
actually processes both streams as "one stream" because we used Pattern subscription -- thus, the output is just one stream with interleaving records from both original streams.
You now apply the same window with aggregation result (TS
omitted) -- we show the result by alternating to process one record per original input stream) as inputRecord ==> outputRecord
<1, k, <1,v1,s1>> ==> <W0<k>, (1 -> <1,null>)>
<1, k, <1,va,s2>> ==> <W0<k>, (1 -> <1,1>>
<2, k, <2,v2,s1>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1, null>)>
<2, k, <2,vb,s2>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1,1>)>
<3, k, <1,v3,s1>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1, null>)>
<3, k, <2,vc,s2>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1,2>)>
<6, k, <2,v4,s1>> ==> <W1<k>, (2 -> <1,null>)>
<11, k, <2,vd,s2>> ==> <W2<k>, (2 -> <null, 1>)>
<12, k, <2,v5,s1>> ==> <W2<k>, (2 -> <1,1>)>
If you compare the latest record per key of this result with the result above, you see that both are the same.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With