Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink Unit Test over ProcessWindowFunction<IN, OUT, KEY, W>

How can I create a unit test for a Stateful Process Function. I have something like this:

 private static SingleOutputStreamOperator<Tuple> methodName(KeyedStream<Event, String> stream) {
        return stream.window(TumblingEventTimeWindows.of(Time.minutes(10)))
                .process(new ProcessFunction());
    }

and

ProcessFunction extends ProcessWindowFunction<IN, OUT, KEY, W>

All the Harness tests that I've found in Flink page are extending from KeyedProcessFunction and this is not my case. Thanks. Kind regards!

like image 591
Alter Avatar asked Oct 16 '25 17:10

Alter


1 Answers

I found a solution, inspired to this method https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L1340

In my case, I have to test a TumblingProcessingTimeWindow where the process() operator use a ProcessWindowFunction to count words keeping the previous window count (i.e not resetting the count each time the window is triggered)

WordCountPojo is a simple POJO with two field: word and count (you can use Tuple2 if you please)

This is the test I wrote:

@Test
void testCounter() throws Exception {
    //create a WindowOperator<Key, Input, Accumulator, Output, Window>
    WindowOperator<String, WordCountPojo, Iterable<WordCountPojo>, WordCountPojo, TimeWindow> operator =
            new WindowOperator<>(
                    TumblingProcessingTimeWindows.of(Time.seconds(3)), //window assigner
                    new TimeWindow.Serializer(), //window serializer
                    WordCountPojo::getWord, //key selector
                    BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), //key serializer
                    new ListStateDescriptor<>( //window state descriptor (in order to accumulate events inside the window)
                            "window-content",
                            TypeInformation.of(WordCountPojo.class).createSerializer(new ExecutionConfig())), //input serializer
                    new InternalIterableProcessWindowFunction<>(new Counter()), //my custom ProcessWindowFunction to invoke
                    ProcessingTimeTrigger.create(), //window trigger
                    0,
                    null);

    //Flink Test Harness
    OneInputStreamOperatorTestHarness<WordCountPojo, WordCountPojo> harness =
            new KeyedOneInputStreamOperatorTestHarness<>(operator, WordCountPojo::getWord, BasicTypeInfo.STRING_TYPE_INFO);

    ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
    harness.open();
    harness.setProcessingTime(10);

    //Push data into window
    harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
    harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
    harness.processElement(new StreamRecord<>(new WordCountPojo("or", 1)));
    harness.processElement(new StreamRecord<>(new WordCountPojo("not", 1)));

    harness.setProcessingTime(3500); //Set processing time in order to trigger the window

    //Expected result
    expected.add(new StreamRecord<>(new WordCountPojo("to", 1), 2999));
    expected.add(new StreamRecord<>(new WordCountPojo("be", 1), 2999));
    expected.add(new StreamRecord<>(new WordCountPojo("or", 1), 2999));
    expected.add(new StreamRecord<>(new WordCountPojo("not", 1), 2999));

    TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
            Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
                    .thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));

    //push other WordCountPojos to test global counting
    harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
    harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));

    harness.setProcessingTime(7000); //trigger the window again

    //Expected result
    expected.add(new StreamRecord<>(new WordCountPojo("to", 2), 5999));
    expected.add(new StreamRecord<>(new WordCountPojo("be", 2), 5999));

    TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
            Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
                    .thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));

    harness.close();
}

Attention points:

  • Type of the accumulator for the WindowOperator is Iterable<WordCountPojo> and NOT simply WordCountPojo. This because my Counter's process() method receive an Iterable and not a single WordCountPojo (remember that Counter extends WindowProcessFunction)
  • WindowOperator's state descriptor parameter is a ListStateDescriptor, this means that when window collects WordCountPojos (WindowOperatorTest example use a ReducingStateDescriptor that reduce by sum, but I don't need to do these because I've the Counter function that is the function that I want to test)
  • WindowsOperator's internal window function parameter is of type InternaleIterableProcessWindowFunction. This function wraps my Counter function and is invoked when the window is triggered. Because the window accumulate an Iterable<WordCountPojo> collected by using aListStateDescriptor, when the Counter function is invoked this Iterable is passed as input parameter of the process() method.
like image 128
Vin Avatar answered Oct 18 '25 07:10

Vin