Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get windowed aggregation from kafka stream?

I have a stream of events which I would like to aggregate based on time windows. My solution gives incremental aggregation rather than giving aggregation on timed window. I have read that this is normal for stream as it would give results as change-log. Also during research I have came across 2 step windowed aggregation with Kafka Streams DSL and How to send final kafka-streams aggregation result of a time windowed KTable? . But solution in first post somewhat outdated (using deprecated API). I used new API which are suggested in those deprecated API. This is my solution,

KStream<String, Event> eventKStream = summarizableData.mapValues(v -> v.getEvent());
    KGroupedStream<String, Event> kGroupedStream = eventKStream.groupBy((key, value) -> {
             String groupBy = getGroupBy(value, criteria);
             return groupBy;
    }, Serialized.with(Serdes.String(), eventSerde));


    long windowSizeMs = TimeUnit.SECONDS.toMillis(applicationProperties.getWindowSizeInSeconds());
    final TimeWindowedKStream<String, Event> groupedByKeyForWindow = kGroupedStream
            .windowedBy(TimeWindows.of(windowSizeMs)
                    .advanceBy(windowSizeMs));

But my results, as I have explained earlier, not given in specific time windows but given as a incremental aggregation. I need my data to output as specified time given in windowSize. Also I read that CACHE_MAX_BYTES_BUFFERING_CONFIG can control the output but I need somewhat solid solution works for every scenario. Also note that patterns given in https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows wiki is now outdated as it uses old APIs. (I'm using kafka-streams 1.1.0 version)

like image 576
Viraj Avatar asked Jun 18 '18 07:06

Viraj


1 Answers

The problem was my mistake. Above, the code sample works fine. But at the end I have converted the KTable to KStream. That was the problem. Converting to KStream causes to output intermediate results as well. The pattern given in https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows works fine. By problematic code was,

// Aggregation

KTable<Windowed<String>, Event> results = groupedByKeyForWindow.aggregate(new AggregateInitiator(), new EventAggregator());

// This converstion causing changelog to output. Instead use next line.
KStream<String, AggregationMessage> aggregationMessageKStream = results.toStream((key, value) -> key.toString())
                .mapValues(this::convertToAggregationMessage).filter((k, v) -> v != null);

// output KTable to sample topic. But this output controlled by 
// COMMIT_INTERVAL_MS_CONFIG and CACHE_MAX_BYTES_BUFFERING_CONFIG parameters. 
// I'm using default values for these params.
results.to(windowedSerde, eventSerde,  "Sample");
like image 163
Viraj Avatar answered Oct 26 '22 00:10

Viraj