My Kafka topic contains statuses keyed by deviceId. I would like to use KStreamBuilder.stream().groupByKey().aggregate(...)
to only keep the latest value of a status in a TimeWindow
. I guess that, as long as the topic is partitioned by key, the aggregation function can always return the latest values in this fashion:
(key, value, older_value) -> value
Is this a guarantee I can expect from Kafka Streams? Should I roll my own processing method that checks the timestamp?
This means that messages should never be lost, although the producer may duplicate work (which may also result in duplicate outputs). Exactly once delivery: The message will be delivered exactly one time. Failures and retries may occur, but the consumer is guaranteed to only receive a given message once.
guarantee is configured to exactly_once, Kafka Streams sets the internal embedded producer client with a transaction id to enable the idempotence and transactional messaging features, and also sets its consumer client with the read-committed mode to only fetch messages from committed transactions from the upstream ...
A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted.
Kafka Streams guaranteed ordering by offsets but not by timestamp. Thus, by default "last update wins" policy is based on offsets but not on timestamp. Late arriving records ("late" defined on timestamps) are out-of-order based on timestamps and they will not be reordered to keep original offsets order.
If you want to have your window containing the latest value based on timestamps you will need to use Processor API (PAPI) to make this work.
Within Kafka Streams' DSL, you cannot access the record timestamp that is required to get the correct result. A easy way might be to put a .transform()
before .groupBy()
and add the timestamp to the record (ie, its value) itself. Thus, you can use the timestamp within your Aggregator
(btw: a .reduce()
that is simpler to use might also work instead of .aggregate()
). Finally, you need to do .mapValues()
after your .aggregate()
to remove the timestamp from the value again.
Using this mix-and-match approach of DSL and PAPI should simplify your code, as you can use DSL windowing support and KTable
and do not need to do low-level time-window and state management.
Of course, you can also just do all this in a single low-level stateful processor, but I would not recommend it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With