Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to avoid repeated tuples in Flink slide window join?

Tags:

apache-flink

For example, there are two streams. One is advertisements showed to users. The tuple in which could be described as (advertiseId, showed timestamp). The other one is click stream -- (advertiseId, clicked timestamp). We want get a joined stream, which includes all the advertisement that is clicked by user in 20 minutes after showed. My solution is to join these two streams on a SlidingTimeWindow. But in the joined stream, there are many repeated tuples. How could I get joined tuple only one in new stream?

stream1.join(stream2)
        .where(0)
        .equalTo(0)
        .window(SlidingTimeWindows.of(Time.of(30, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
like image 455
Jun Avatar asked Nov 21 '15 22:11

Jun


2 Answers

Solution 1:

Let flink support join two streams on separate windows like Spark streaming. In this case, implement SlidingTimeWindows(21 mins, 1 min) on advertisement stream and TupblingTimeWindows(1 min) on Click stream, then join these two windowed streams.

TupblingTimeWindows could avoid duplicate records in the joined stream. 21 mins size SlidingTimeWindows could avoid missing legal clicks. One issue is there would be some illegal click(click after 20 mins) in the joined stream. This problem could be fixed easily by adding a filter.

MultiWindowsJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
            new MultiWindowsJoinedStreams<>(advertisement, click);

    DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams.where(keySelector)
            .window(SlidingTimeWindows.of(Time.of(21, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
            .equalTo(keySelector)
            .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
            .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                private static final long serialVersionUID = -3625150954096822268L;

                @Override
                public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                    return new Tuple3<>(first.f0, first.f1, second.f1);
                }
            });

    joinedStream = joinedStream.filter(new FilterFunction<Tuple3<String, Long, Long>>() {
        private static final long serialVersionUID = -4325256210808325338L;

        @Override
        public boolean filter(Tuple3<String, Long, Long> value) throws Exception {
            return value.f1<value.f2&&value.f1+20000>=value.f2;
        }
    });

Solution 2:

Flink supports join operation without window. A join operator implement the interface TwoInputStreamOperator keeps two buffers(time length based) of these two streams and output one joined stream.

DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).keyBy(keySelector).assignTimestamps(timestampExtractor1);

    DataStream<Tuple2<String, Long>> click = env
            .addSource(new FlinkKafkaConsumer082<String>("click", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).keyBy(keySelector).assignTimestamps(timestampExtractor2);

    NoWindowJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
            new NoWindowJoinedStreams<>(advertisement, click);
    DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams
            .where(keySelector)
            .buffer(Time.of(20, TimeUnit.SECONDS))
            .equalTo(keySelector)
            .buffer(Time.of(5, TimeUnit.SECONDS))
            .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                private static final long serialVersionUID = -5075871109025215769L;

                @Override
                public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                    return new Tuple3<>(first.f0, first.f1, second.f1);
                }
            });

I implemented two new join operators base on Flink streaming API TwoInputTransformation. Please check Flink-stream-join. I will add more tests to this repository.

like image 165
Jun Avatar answered Oct 19 '22 20:10

Jun


On your code, you defined an overlapping sliding window (slide is smaller than window size). If you don't want to have duplicates you can define a non-overlapping window by only specifying the window size (the default slide is equal to the window size).

like image 43
Matthias J. Sax Avatar answered Oct 19 '22 21:10

Matthias J. Sax