How can I implement an operator with Flink's DataStream API that sends an event when no data was received from a stream for a certain amount of time?
Such an operator can be implemented using a ProcessFunction
.
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);
input
// use keyBy to have keyed state.
// NullByteKeySelector will move all data to one task. You can also use other keys
.keyBy(new NullByteKeySelector())
// use process function with 60 seconds timeout
.process(new TimeOutFunction(60 * 1000));
The TimeOutFunction
is defined as follows. In this example it uses processing time.
public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {
// delay after which an alert flag is thrown
private final long timeOut;
// state to remember the last timer set
private transient ValueState<Long> lastTimer;
public TimeOutFunction(long timeOut) {
this.timeOut = timeOut;
}
@Override
public void open(Configuration conf) {
// setup timer state
ValueStateDescriptor<Long> lastTimerDesc =
new ValueStateDescriptor<Long>("lastTimer", Long.class);
lastTimer = getRuntimeContext().getState(lastTimerDesc);
}
@Override
public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
// get current time and compute timeout time
long currentTime = ctx.timerService().currentProcessingTime();
long timeoutTime = currentTime + timeOut;
// register timer for timeout time
ctx.timerService().registerProcessingTimeTimer(timeoutTime);
// remember timeout time
lastTimer.update(timeoutTime);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
// check if this was the last timer we registered
if (timestamp == lastTimer.value()) {
// it was, so no data was received afterwards.
// fire an alert.
out.collect(true);
}
}
}
You could set up a time window with a custom trigger function. In the trigger function, every time the an event is received the "onEvent" method would set a processingTimeTrigger to "currentTime + desiredTimeDelay". Then when a new event comes, you delete the trigger that was previously set and make a new one. If an event doesn't come by the time the system time is the time on the processingTimeTrigger, it fires and the window would be processed. Even if no events came, the list of events that are going to be processed would just be empty.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With