Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

apache flink 0.10 how to get the first occurence of a composite key from an unbounded input dataStream?

I am a newbie with apache flink. I have an unbound data stream in my input (fed into flink 0.10 via kakfa).

I want to get the 1st occurence of each primary key (the primary key is the contract_num and the event_dt).
These "duplicates" occur nearly immediately after each other. The source system cannot filter this for me, so flink has to do it.

Here is my input data:

contract_num, event_dt, attr 
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C

Here is the output data I want:

A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C

Note the 2nd row has been removed as the key combination of A001 and '2016-02-24 10:25:08' already occurred in the 1st row.

How can I do this with flink 0.10?

I was thinking about using keyBy(0,1) but after that I don't know what to do!

(I used joda-time and org.flinkspector to setup these tests).

@Test
public void test() {
    DateTime threeSecondsAgo = (new DateTime()).minusSeconds(3);
    DateTime twoSecondsAgo = (new DateTime()).minusSeconds(2);
    DateTime oneSecondsAgo = (new DateTime()).minusSeconds(2);

    DataStream<Tuple3<String, Date, String>> testStream =
            createTimedTestStreamWith(
                    Tuple3.of("A1", threeSecondsAgo.toDate(), "X"))
            .emit(Tuple3.of("A1", threeSecondsAgo.toDate(), "Y"), after(0, TimeUnit.NANOSECONDS))
            .emit(Tuple3.of("A1", twoSecondsAgo.toDate(), "Z"), after(0, TimeUnit.NANOSECONDS))
            .emit(Tuple3.of("A2", oneSecondsAgo.toDate(), "C"), after(0, TimeUnit.NANOSECONDS))
            .close();
    
    testStream.keyBy(0,1);
}
like image 305
timmy_stapler Avatar asked Feb 24 '16 10:02

timmy_stapler


People also ask

What can you do with the Flink API?

It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data. This page describes the API calls available in Flink CEP.

What is the Flink CEP library?

FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data. This page describes the API calls available in Flink CEP.

What is flinkcep?

FlinkCEP - Complex event processing for Flink # FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data.

What are the different types of contiguity in flinkcep?

FlinkCEP supports the following forms of contiguity between events: Strict Contiguity: Expects all matching events to appear strictly one after the other, without any non-matching events in-between. Relaxed Contiguity: Ignores non-matching events appearing in-between the matching ones.


1 Answers

Filtering duplicates over an infinite stream will eventually fail if your key space is larger than your available storage space. The reason is that you have to store the already seen keys somewhere to filter out the duplicates. Thus, it would be good to define a time window after which you can purge the current set of seen keys.

If you're aware of this problem but want to try it anyway, you can do it by applying a stateful flatMap operation after the keyBy call. The stateful mapper uses Flink's state abstraction to store whether it has already seen an element with this key or not. That way, you will also benefit from Flink's fault tolerance mechanism because your state will be automatically checkpointed.

A Flink program doing your job could look like

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple3<String, Date, String>> input = env.fromElements(Tuple3.of("foo", new Date(1000), "bar"), Tuple3.of("foo", new Date(1000), "foobar"));

    input.keyBy(0, 1).flatMap(new DuplicateFilter()).print();

    env.execute("Test");
}

where the implementation of DuplicateFilter depends on the version of Flink.

Version >= 1.0 implementation

public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {

    static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
    private ValueState<Boolean> operatorState;

    @Override
    public void open(Configuration configuration) {
        operatorState = this.getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
        if (!operatorState.value()) {
            // we haven't seen the element yet
            out.collect(value);
            // set operator state to true so that we don't emit elements with this key again
            operatorState.update(true);
        }
    }
}

Version 0.10 implementation

public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {

    private OperatorState<Boolean> operatorState;

    @Override
    public void open(Configuration configuration) {
        operatorState = this.getRuntimeContext().getKeyValueState("seen", Boolean.class, false);
    }

    @Override
    public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
        if (!operatorState.value()) {
            // we haven't seen the element yet
            out.collect(value);
            operatorState.update(true);
        }
    }
}

Update: Using a tumbling time window

input.keyBy(0, 1).timeWindow(Time.seconds(1)).apply(new WindowFunction<Iterable<Tuple3<String,Date,String>>, Tuple3<String, Date, String>, Tuple, TimeWindow>() {
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Date, String>> input, Collector<Tuple3<String, Date, String>> out) throws Exception {
        out.collect(input.iterator().next());
    }
})
like image 107
Till Rohrmann Avatar answered Sep 22 '22 02:09

Till Rohrmann