Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to sort an out-of-order event time stream using Flink

This question covers how to sort an out-of-order stream using Flink SQL, but I would rather use the DataStream API. One solution is to do this with a ProcessFunction that uses a PriorityQueue to buffer events until the watermark indicates they are no longer out-of-order, but this performs poorly with the RocksDB state backend (the problem is that each access to the PriorityQueue will require ser/de of the entire PriorityQueue). How can I do this efficiently regardless of which state backend is in use?

like image 575
David Anderson Avatar asked Mar 04 '23 00:03

David Anderson


1 Answers

A better approach (which is more-or-less what is done internally by Flink's SQL and CEP libraries) is to buffer the out-of-order stream in MapState, as follows:

If you are sorting each key independently, then first key the stream. Otherwise, for a global sort, key the stream by a constant so that you can use a KeyedProcessFunction to implement the sorting.

In the open method of that process function, instantiate a MapState object, where the keys are timestamps and the values are lists of stream elements all having the same timestamp.

In the onElement method:

  • If an event is late, either drop it or send it to a side output
  • Otherwise, append the event to entry of the map corresponding to its timestamp
  • Register an event time timer for this event's timestamp

When onTimer is called, then the entries in the map for this timestamp are ready to be released as part of the sorted stream -- because the current watermark now indicates that all earlier events should have already been processed. Don't forget to clear the entry in the map after sending the events downstream.

like image 109
David Anderson Avatar answered Mar 15 '23 22:03

David Anderson