I have a scenario where I have to change the state if a second event did not follow first event within x seconds. For e.g. user did not logout in 100 mins, consider him to be in invalid state. How can this be designed using the current pattern operations?
FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what's important in your data.
A flatmap function that splits sentences to words: Java. dataStream. flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.
"Flink does not provide its own data storage system but provides data-source and sink connectors to systems, such as Amazon Kinesis, Apache Kafka, Alluxio, HDFS, Apache Cassandra, and Elasticsearch." Soon, the first half of this sentence may no longer be applicable.
Apache Flink is an open-source, unified stream-processing and batch-processing framework developed by the Apache Software Foundation. The core of Apache Flink is a distributed streaming data-flow engine written in Java and Scala.
As this has been implemented already, I thought of answering this question for those who are coming here looking for answers.
As of Flink 1.0.0, this can be done with handling the Timedout Pattern, for example, if your CEP pattern is something like this :
Example partially from from Flink Website (There are some major changes between 1.2 and 1.3 please adjust your code accordingly, this answer focuses on 1.3)
Pattern description: - Get first event of type "error", followed by a second event event of type "critical" within 10 seconds
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream<BAMEvent> patternStream = CEP.pattern(inputStream, pattern)
DataStream<Either<String, String>> result = patternStream.select(new PatternTimeoutFunction<Event, String>() {
@Override
public String timeout(Map<String, List<Event>> map, long l) throws Exception {
return map.toString() +" @ "+ l;
}
}, new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> map) throws Exception {
return map.toString();
}
});
For this case, if the user doesn't logout even after 100 mins, then as the corresponding event wouldn't arrive, it would result in the pattern being timedout and the partial event(the initiating event) would be captured in the PatternTimeoutFunction.
At the moment this is not possible to do. The solution would be to have a timeout handler which is triggered whenever a event sequence is discarded because it falls out of the defined time window. There is already a JIRA issue which tracks the timeout handler implementation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With