Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does the Kafka streams aggregation have any ordering guarantee?

My Kafka topic contains statuses keyed by deviceId. I would like to use KStreamBuilder.stream().groupByKey().aggregate(...) to only keep the latest value of a status in a TimeWindow. I guess that, as long as the topic is partitioned by key, the aggregation function can always return the latest values in this fashion:

(key, value, older_value) -> value

Is this a guarantee I can expect from Kafka Streams? Should I roll my own processing method that checks the timestamp?

like image 394
Steve Avatar asked Jan 09 '17 12:01

Steve


People also ask

Does Kafka guarantee exactly once delivery?

This means that messages should never be lost, although the producer may duplicate work (which may also result in duplicate outputs). Exactly once delivery: The message will be delivered exactly one time. Failures and retries may occur, but the consumer is guaranteed to only receive a given message once.

How Kafka Streams guarantees exactly once processing?

guarantee is configured to exactly_once, Kafka Streams sets the internal embedded producer client with a transaction id to enable the idempotence and transactional messaging features, and also sets its consumer client with the read-committed mode to only fetch messages from committed transactions from the upstream ...

Is Kafka stream ordered?

A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.

How does Kafka aggregation work?

In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted.


1 Answers

Kafka Streams guaranteed ordering by offsets but not by timestamp. Thus, by default "last update wins" policy is based on offsets but not on timestamp. Late arriving records ("late" defined on timestamps) are out-of-order based on timestamps and they will not be reordered to keep original offsets order.

If you want to have your window containing the latest value based on timestamps you will need to use Processor API (PAPI) to make this work.

Within Kafka Streams' DSL, you cannot access the record timestamp that is required to get the correct result. A easy way might be to put a .transform() before .groupBy() and add the timestamp to the record (ie, its value) itself. Thus, you can use the timestamp within your Aggregator (btw: a .reduce() that is simpler to use might also work instead of .aggregate()). Finally, you need to do .mapValues() after your .aggregate() to remove the timestamp from the value again.

Using this mix-and-match approach of DSL and PAPI should simplify your code, as you can use DSL windowing support and KTable and do not need to do low-level time-window and state management.

Of course, you can also just do all this in a single low-level stateful processor, but I would not recommend it.

like image 124
Matthias J. Sax Avatar answered Oct 03 '22 08:10

Matthias J. Sax