Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Checkpointing Event Time Watermarks in Flink

We are receiving events from a no. of independent data sources and hence, data arriving into our Flink topology (via Kafka) would be out of order.

We are creating 1-min event time windows in our Flink topology and generating event time watermarks as (current event time - some threshold (30 seconds)) at the source operator.

In case a few events arrive after the set threshold, those events are simply ignored (which is ok in our case, because most of the events belonging to that minute would have already arrived and got processed in the corresponding window).

Now, the problem is that in case the program crashes (for whatever reason) and is then resumed again from the last successful checkpoint, out of order arriving events would trigger execution of past (already processed) windows (with only a minuscule of events in that window) overriding results of prev. computation of that window.

In case Flink had checkpointed event time watermarks, this problem would not have occurred.

So, I am wondering if there is a way to enforce event time watermarks' checkpointing in Flink...

like image 447
Vijay Kansal Avatar asked Nov 08 '22 10:11

Vijay Kansal


2 Answers

I think the easiest solution would be to inject a ProcessFunction after the window operator.

The ProcessFunction has access to the current watermark via its Context object and can store it in union operator state. In case of a failure, the ProcessFunction recovers the watermark from its state and filters all records with smaller timestamps than the watermark (timestamps are also accessible via the Context obejct).

like image 124
Fabian Hueske Avatar answered Dec 13 '22 05:12

Fabian Hueske


Although this is an old question I had the same problem. The application was restarting and the join function with an event time window was not triggering anymore because events from one of the streams finished before the crash. The join could recover the state but since there were no watermarks anymore from one of the stream, events were never join after a restart.

The solution that I found was to create a checkpoint for the latest watermark just after the source operator. Since there is no UDF to persist snapshots of watermarks I had to create my own operator that does not change the events (identity function) and save the latest watermark as its state. When Flink recovers from a crash the WatermarkStreamOperator.initializeState() emits the last watermark checkpointed on the ListState<Long> latestWatermark on the line processWatermark(new Watermark(maxWatermark)). Then the join with the event time window can be triggered.

public class WatermarkStreamOperator<IN> extends AbstractUdfStreamOperator<IN, WatermarkFunction<IN>>
        implements OneInputStreamOperator<IN, IN> {
    private static final long serialVersionUID = 1L;
    private ListState<Long> latestWatermark;
    public WatermarkStreamOperator(WatermarkFunction<IN> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }
    @Override
    public void initializeState(StateInitializationContext context) throws Exception { System.out.println("WatermarkStreamOperator.initializeState");
        super.initializeState(context);
        ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("latest-watermark", Long.class);
        latestWatermark = context.getOperatorStateStore().getListState(descriptor);
        List<Long> watermarkList = new ArrayList<>();
        latestWatermark.get().forEach(watermarkList::add);
        Long maxWatermark = watermarkList.stream().max(Long::compare).orElse(0L);
        if (!maxWatermark.equals(Long.valueOf(0l))) {
            System.out.println("watermarkList recovered max: " + maxWatermark);
            processWatermark(new Watermark(maxWatermark));
        }
    }
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element);
    }
    @Override
    public void processWatermark(Watermark mark) throws Exception {
        System.out.println("processing watermark: " + mark.getTimestamp()); latestWatermark.update(Arrays.asList(mark.getTimestamp()));
        super.processWatermark(mark);
    }
}

And the identity UDF for the operator:

public interface WatermarkFunction<T> extends Function, Serializable {
    T process(T value) throws Exception;
}

Finally I use the .transform() to call my WatermarkStreamOperator with MyTupleWatermarkFunc.

DataStream<Tuple2<String, Integer>> dataStream = env
                .addSource(new MySource(sentence))
                .transform("myStatefulWatermarkOperator",
                        TypeInformation.of(String.class),
                        new WatermarkStreamOperator<>(new MyTupleWatermarkFunc()))
                ...
                ...
    public class MyTupleWatermarkFunc implements WatermarkFunction<String> {
        private static final long serialVersionUID = 1L;
        @Override
        public String process(String value) throws Exception {
            return value;
        }
    }

Here are the unit and imtegration tests that I created for this https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java

like image 42
Felipe Avatar answered Dec 13 '22 05:12

Felipe