I wonder if there's any way to sort records within a window using Kafka Streams DSL or Processor API.
Imagine the following situation as an example (arbitrary one, but similar to what I need):
There is a Kafka topic of some events, let's say user clicks. Let's say topic has 10 partitions. Messages are partitioned by key, but each key is unique, so it's sort of a random partitioning. Each record contains a user id, which is used later to repartition the stream.
We consume the stream, and publish each message to another topic partitioning the record by it's user id (repartition the original stream by user id).
Then we consume this repartitioned stream, and we store consumed records in local state store windowed by 10 minutes. All clicks of a particular user are always in the same partition, but order is not guarantied, because the original topic had 10 partitions.
I understand the windowing model of Kafka Streams, and that time is advanced when new records come in, but I need this window to use processing time, not the event time, and then when window is expired, I need to be able to sort buffered events, and emit them in that order to another topic.
Notice:
We need to be able to flush/process records within the window using processing time, not the event time. We can't wait for the next click to advance the time, because it may never happen.
We need to remove all the records from the store, as soon window is sorted and flushed.
If application crashes, we need to recover (in the same or another instance of the application) and process all the windows that were not processed, without waiting for new records to come for a particular user.
I know Kafka Streams 1.0.0 allows to use wall clock time in Processing API, but I'm not sure what would be the right way to implement what I need (more importantly taking into account the recovery process requirement described above).
Windowing. Windowing allows you to bucket stateful operations by time, without which your aggregations would endlessly accumulate. A window gives you a snapshot of an aggregate within a given timeframe, and can be set as hopping, tumbling, session, or sliding.
Kafka Streams guaranteed ordering by offsets but not by timestamp. Thus, by default "last update wins" policy is based on offsets but not on timestamp. Late arriving records ("late" defined on timestamps) are out-of-order based on timestamps and they will not be reordered to keep original offsets order.
A sliding window used for aggregating events. Sliding Windows are defined based on a record's timestamp, the window size based on the given maximum time difference (inclusive) between records in the same window, and the given window grace period.
The difference to event time is that this ingestion timestamp is generated when the record is appended to the target topic by the Kafka broker, not when the record is created "at the source". The difference to processing time is that processing time is when the stream processing application processes the record.
You can see my answer to a similar question here: https://stackoverflow.com/a/44345374/7897191
Since your message keys are already unique you can ignore my comments about de-duplication.
Now that KIP-138 (wall-clock punctuation semantics) has been released in 1.0.0 you should be able to implement the outlined algorithm without issues. It uses the Processor API. I don't know of a way of doing this with only the DSL.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With