Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KTable state store infinite retention

We have the following high level DSL processing topology:

TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);

KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");


KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");

KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")

In short, what we're doing above is:

  1. Counting events using a hopping window
  2. Doing a left join between the resulting KTables (left because of business logic)
  3. Grouping and changing the key and value: taking a component (Long) of the key and moving to the value
  4. Aggregating the resulting KTable to a final KTable, the aggregation object is a map from T to the two joined counters from step 1. Note that the size of the map is no more than 600 and usually much less.

The idea is to create windowed event counts and use these windowed keys for the join and aggregate operations (which in the KTable case don't have windows on such operations)

The problem is this: The state stores of the join and aggregate operations have no retention mechanism and cause a space explosion in the disk (RocksDB).

More specifically: The (hopping) windows cause a cartesian product on the keys and there's no mechanism to delete old windows.

The same problem will also occur if the KTable key were not windowed, but just a large enough amount of unique keys

Note that the state stores backing table1 and table2 have no space issue, this is because they are provided a windowed store by the DSL which manages dropping old windows. In the join and aggregate we treat the windowed keys as "any old key" and the DSL does the same and uses a non-windowed KeyValueStore.

This question is related to the following: KAFKA-4212, KAFKA-4273, confluent forum question

Are there any misunderstood concepts here? Is there a simple way to implement this topology using the DSL? If not, what's the suggested ways to implement it using the low level API?

like image 589
glerman Avatar asked Nov 22 '17 16:11

glerman


1 Answers

I think you could do something like this:

StreamsBuilder builder = new StreamBuilder();
KStream<K,V> streams = builder.stream(/* pattern for both streams */);

KStream<SmallerKey,Tuple<Long,V,String>> enrichedStream = stream.transform(
    /* custom Transformer that set the weaker grouping key right here
       and puts the extracted component into the value before the aggregation;
       additionally (that's why we need a Transformer) get the topic name from
       context object and enrich the value accordingly (ie, third String argument in the output Tuple */);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> = stream.groupByKey.aggregate(
    timeWindow,
    /* initializer: return an empty Map;
       aggregator:
       for each input record, check if Map contains entry for Long key already (ie, extracted component, first argument from input Tuple<Long,V,String>;
         if not, add new map entry with Pair(0,0)
       take the corresponding Pair from the Map and increase one
       counter depending on the original topic that
       is encoded in the input value (ie, Pair.first is counter for first topic and Pair.second is counter for second topic) */);

Example:

Assume two input stream s1 and s2 with the following records (<TS,key,value>):

s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>

In your original program, you would first count both streams individually (assuming tumbling window of size 5) getting (omitting TS):

<W0<k1>, 1> | <W0<k2>, 1> | <W0<k1>, 2> | <W1<k2>, 1> | <W2<k2>, 1>  
and
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k2>, 2> | <W2<k2>, 1>  

After your left-join you get (result after all records got processed, omitting intermediates):

<<W0<k1>, <2,1>> | <W0<k2>, <1,2>> | <W1<k2>, <1,null>> | <W2<k2>, <1,1>>

Now you re-group using a "weaker key", extract a key part into the value, and put all entries into a map, bases on the extracted key part. Let's assume that we split our key based on "char" and "number" (ie, k1 is split into k as smallerKey and 1 is the extracted Long that goes into the value). After the aggregation you get (I denote the map as (k1 -> v1, k2 - v2):

<<W0<k>, (1 -> <2,1>, 2 -> <1,2>> | <W1<k>, (2 -> <1,null>)> | <W2<k>, (2 -> <1,1>)>

If this is a correct example (I might have miss understood your problem description). You can do the same using transform/groupBy/aggregate as describe above. The input was:

s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>

The result of transform is (including TS):

<1, k, <1,v1,s1>> | <2, k, <2,v2,s1>> | <3, k, <1,v3,s1>> | <6, k, <2,v4,s1>> | <12, k, <2,v5,s1>>
and
<1, k, <1,va,s2>> | <2, k, <2,vb,s2>> | <3, k, <2,vc,s2>> | <11, k, <2,vd,s2>>

Note, that Transform actually processes both streams as "one stream" because we used Pattern subscription -- thus, the output is just one stream with interleaving records from both original streams.

You now apply the same window with aggregation result (TS omitted) -- we show the result by alternating to process one record per original input stream) as inputRecord ==> outputRecord

<1, k, <1,v1,s1>> ==> <W0<k>, (1 -> <1,null>)>
<1, k, <1,va,s2>> ==> <W0<k>, (1 -> <1,1>>
<2, k, <2,v2,s1>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1, null>)>
<2, k, <2,vb,s2>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1,1>)>
<3, k, <1,v3,s1>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1, null>)>
<3, k, <2,vc,s2>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1,2>)>
<6, k, <2,v4,s1>> ==> <W1<k>, (2 -> <1,null>)>
<11, k, <2,vd,s2>> ==> <W2<k>, (2 -> <null, 1>)>
<12, k, <2,v5,s1>> ==> <W2<k>, (2 -> <1,1>)>

If you compare the latest record per key of this result with the result above, you see that both are the same.

like image 144
Matthias J. Sax Avatar answered Nov 20 '22 02:11

Matthias J. Sax