This question covers how to sort an out-of-order stream using Flink SQL, but I would rather use the DataStream API. One solution is to do this with a ProcessFunction that uses a PriorityQueue to buffer events until the watermark indicates they are no longer out-of-order, but this performs poorly with the RocksDB state backend (the problem is that each access to the PriorityQueue will require ser/de of the entire PriorityQueue). How can I do this efficiently regardless of which state backend is in use?
A better approach (which is more-or-less what is done internally by Flink's SQL and CEP libraries) is to buffer the out-of-order stream in MapState, as follows:
If you are sorting each key independently, then first key the stream. Otherwise, for a global sort, key the stream by a constant so that you can use a KeyedProcessFunction to implement the sorting.
In the open
method of that process function, instantiate a MapState object, where the keys are timestamps and the values are lists of stream elements all having the same timestamp.
In the onElement
method:
When onTimer
is called, then the entries in the map for this timestamp are ready to be released as part of the sorted stream -- because the current watermark now indicates that all earlier events should have already been processed. Don't forget to clear the entry in the map after sending the events downstream.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With