We are receiving events from a no. of independent data sources and hence, data arriving into our Flink topology (via Kafka) would be out of order.
We are creating 1-min event time windows in our Flink topology and generating event time watermarks as (current event time - some threshold (30 seconds)) at the source operator.
In case a few events arrive after the set threshold, those events are simply ignored (which is ok in our case, because most of the events belonging to that minute would have already arrived and got processed in the corresponding window).
Now, the problem is that in case the program crashes (for whatever reason) and is then resumed again from the last successful checkpoint, out of order arriving events would trigger execution of past (already processed) windows (with only a minuscule of events in that window) overriding results of prev. computation of that window.
In case Flink had checkpointed event time watermarks, this problem would not have occurred.
So, I am wondering if there is a way to enforce event time watermarks' checkpointing in Flink...
I think the easiest solution would be to inject a ProcessFunction
after the window operator.
The ProcessFunction
has access to the current watermark via its Context
object and can store it in union operator state.
In case of a failure, the ProcessFunction
recovers the watermark from its state and filters all records with smaller timestamps than the watermark (timestamps are also accessible via the Context
obejct).
Although this is an old question I had the same problem. The application was restarting and the join function with an event time window was not triggering anymore because events from one of the streams finished before the crash. The join could recover the state but since there were no watermarks anymore from one of the stream, events were never join after a restart.
The solution that I found was to create a checkpoint for the latest watermark just after the source operator. Since there is no UDF to persist snapshots of watermarks I had to create my own operator that does not change the events (identity function) and save the latest watermark as its state. When Flink recovers from a crash the WatermarkStreamOperator.initializeState()
emits the last watermark checkpointed on the ListState<Long> latestWatermark
on the line processWatermark(new Watermark(maxWatermark))
. Then the join with the event time window can be triggered.
public class WatermarkStreamOperator<IN> extends AbstractUdfStreamOperator<IN, WatermarkFunction<IN>>
implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
private ListState<Long> latestWatermark;
public WatermarkStreamOperator(WatermarkFunction<IN> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void initializeState(StateInitializationContext context) throws Exception { System.out.println("WatermarkStreamOperator.initializeState");
super.initializeState(context);
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("latest-watermark", Long.class);
latestWatermark = context.getOperatorStateStore().getListState(descriptor);
List<Long> watermarkList = new ArrayList<>();
latestWatermark.get().forEach(watermarkList::add);
Long maxWatermark = watermarkList.stream().max(Long::compare).orElse(0L);
if (!maxWatermark.equals(Long.valueOf(0l))) {
System.out.println("watermarkList recovered max: " + maxWatermark);
processWatermark(new Watermark(maxWatermark));
}
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
System.out.println("processing watermark: " + mark.getTimestamp()); latestWatermark.update(Arrays.asList(mark.getTimestamp()));
super.processWatermark(mark);
}
}
And the identity UDF for the operator:
public interface WatermarkFunction<T> extends Function, Serializable {
T process(T value) throws Exception;
}
Finally I use the .transform()
to call my WatermarkStreamOperator
with MyTupleWatermarkFunc
.
DataStream<Tuple2<String, Integer>> dataStream = env
.addSource(new MySource(sentence))
.transform("myStatefulWatermarkOperator",
TypeInformation.of(String.class),
new WatermarkStreamOperator<>(new MyTupleWatermarkFunc()))
...
...
public class MyTupleWatermarkFunc implements WatermarkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public String process(String value) throws Exception {
return value;
}
}
Here are the unit and imtegration tests that I created for this https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With