The purpose I want to achieve is to group by user some messages I receive from a Kafka topic and window them in order to aggregate the messages I receive in the (5 minutes) window. Then I'd like to collect all aggregates in each window in order to process them at once adding them to a report of all the messages I received in the 5 minutes interval.
The last point seems to be the tough part as Kafka Streams doesn't seem to provide (at least I can't find it!) anything that can collect all the window related stuff in a "finite" stream to be processed in one place.
This is the code I implemented
StreamsBuilder builder = new StreamsBuilder();
KStream<UserId, Message> messages = builder.stream("KAFKA_TOPIC");
TimeWindowedKStream<UserId, Message> windowedMessages =
messages.
groupByKey().windowedBy(TimeWindows.of(SIZE_MS));
KTable<Windowed<UserId>, List<Message>> messagesAggregatedByWindow =
windowedMessages.
aggregate(
() -> new LinkedList<>(), new MyAggregator<>(),
Materialized.with(new MessageKeySerde(), new MessageListSerde())
);
messagesAggregatedByWindow.toStream().foreach((key, value) -> log.info("({}), KEY {} MESSAGE {}", value.size(), key, value.toString()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
The result is something like
KEY [UserId(82770583)@1531502760000/1531502770000] Message [Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
KEY [UserId(77082590)@1531502760000/1531502770000] Message [Message(userId=UserId(77082590),message="g")]
KEY [UserId(85077691)@1531502750000/1531502760000] Message [Message(userId=UserId(85077691),message="h")]
KEY [UserId(79117307)@1531502780000/1531502790000] Message [Message(userId=UserId(79117307),message="e")]
KEY [UserId(73176289)@1531502760000/1531502770000] Message [Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
KEY [UserId(92077080)@1531502760000/1531502770000] Message [Message(userId=UserId(92077080),message="w")]
KEY [UserId(78530050)@1531502760000/1531502770000] Message [Message(userId=UserId(78530050),message="t")]
KEY [UserId(64640536)@1531502760000/1531502770000] Message [Message(userId=UserId(64640536),message="y")]
For each window there are many log lines and they are mixed with the other windows.
What I'd like to have is something like:
// Hypothetical implementation
windowedMessages.streamWindows((interval, window) -> process(interval, window));
where method process would be something like:
// Hypothetical implementation
void process(Interval interval, WindowStream<UserId, List<Message>> windowStream) {
// Create report for the whole window
Report report = new Report(nameFromInterval());
// Loop on the finite iterable that represents the window content
for (WindowStreamEntry<UserId, List<Message>> entry: windowStream) {
report.addLine(entry.getKey(), entry.getValue());
}
report.close();
}
The result would be grouped like this (each report is a call to my callback: void process(...)) and the commit of each window would be committed when the whole window is processed:
Report 1:
KEY [UserId(85077691)@1531502750000/1531502760000] Message [Message(userId=UserId(85077691),message="h")]
Report 2:
KEY [UserId(82770583)@1531502760000/1531502770000] Message [Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
KEY [UserId(77082590)@1531502760000/1531502770000] Message [Message(userId=UserId(77082590),message="g")]
KEY [UserId(73176289)@1531502760000/1531502770000] Message [Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
KEY [UserId(92077080)@1531502760000/1531502770000] Message [Message(userId=UserId(92077080),message="w")]
KEY [UserId(78530050)@1531502760000/1531502770000] Message [Message(userId=UserId(78530050),message="t")]
KEY [UserId(64640536)@1531502760000/1531502770000] Message [Message(userId=UserId(64640536),message="y")]
Report 3
KEY [UserId(79117307)@1531502780000/1531502790000] Message [Message(userId=UserId(79117307),message="e")]
Windowing. Windowing allows you to bucket stateful operations by time, without which your aggregations would endlessly accumulate. A window gives you a snapshot of an aggregate within a given timeframe, and can be set as hopping, tumbling, session, or sliding.
Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.
Use the Kafka batch stream processor to synchronize MDM data in large (or small) batches. Once you connect to the MDM database, you can synchronize MDM data in batch mode.
Kafka Streams is a library for building streaming applications, specifically applications that transform input Kafka topics into output Kafka topics (or calls to external services, or updates to databases, or whatever). It lets you do this with concise code in a way that is distributed and fault-tolerant.
I had the same doubt. I've talked with the developers of the library and they said that this is a really common request yet not implemented. It will be released soon.
You can find more information here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With