Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Flink CEP Pattern operation for NOT followedBy

I have a scenario where I have to change the state if a second event did not follow first event within x seconds. For e.g. user did not logout in 100 mins, consider him to be in invalid state. How can this be designed using the current pattern operations?

like image 403
Sowmya V Avatar asked Apr 12 '16 12:04

Sowmya V


People also ask

What is Flink CEP?

FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what's important in your data.

What is flatMap in Flink?

A flatmap function that splits sentences to words: Java. dataStream. flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.

Does Flink has its own storage?

"Flink does not provide its own data storage system but provides data-source and sink connectors to systems, such as Amazon Kinesis, Apache Kafka, Alluxio, HDFS, Apache Cassandra, and Elasticsearch." Soon, the first half of this sentence may no longer be applicable.

Is Flink written in Scala?

Apache Flink is an open-source, unified stream-processing and batch-processing framework developed by the Apache Software Foundation. The core of Apache Flink is a distributed streaming data-flow engine written in Java and Scala.


2 Answers

As this has been implemented already, I thought of answering this question for those who are coming here looking for answers.

As of Flink 1.0.0, this can be done with handling the Timedout Pattern, for example, if your CEP pattern is something like this :

Example partially from from Flink Website (There are some major changes between 1.2 and 1.3 please adjust your code accordingly, this answer focuses on 1.3)

Pattern description: - Get first event of type "error", followed by a second event event of type "critical" within 10 seconds

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("error");
    }
}).followedBy("end").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("critical");
    }
}).within(Time.seconds(10));

PatternStream<BAMEvent> patternStream = CEP.pattern(inputStream, pattern)

DataStream<Either<String, String>> result = patternStream.select(new PatternTimeoutFunction<Event, String>() {
  @Override
  public String timeout(Map<String, List<Event>> map, long l) throws Exception {
    return map.toString() +" @ "+ l;
  }
}, new PatternSelectFunction<Event, String>() {

  @Override
  public String select(Map<String, List<Event>> map) throws Exception {
    return map.toString();
  }
});

For this case, if the user doesn't logout even after 100 mins, then as the corresponding event wouldn't arrive, it would result in the pattern being timedout and the partial event(the initiating event) would be captured in the PatternTimeoutFunction.

like image 88
Biplob Biswas Avatar answered Nov 09 '22 16:11

Biplob Biswas


At the moment this is not possible to do. The solution would be to have a timeout handler which is triggered whenever a event sequence is discarded because it falls out of the defined time window. There is already a JIRA issue which tracks the timeout handler implementation.

like image 2
Till Rohrmann Avatar answered Nov 09 '22 17:11

Till Rohrmann