I am trying to create a JUnit test for a Flink streaming job which writes data to a kafka topic and read data from the same kafka topic using FlinkKafkaProducer09
and FlinkKafkaConsumer09
respectively. I am passing a test data in the produce:
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
And checking whether same data is coming from the consumer as:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
using TestListResultSink
.
I am able to see the data coming from the consumer as expected by printing the stream. But could not get the Junit test result as the consumer will keep on running even after the message finished. So it did not come to test part.
Is thre any way in Flink
or FlinkKafkaConsumer09
to stop the process or to run for specific time?
The underlying problem is that streaming programs are usually not finite and run indefinitely.
The best way, at least for the moment, is to insert a special control message into your stream which lets the source properly terminate (simply stop reading more data by leaving the reading loop). That way Flink will tell all down-stream operators that they can stop after they have consumed all data.
Alternatively, you can throw a special exception in your source (e.g. after some time) such that you can distinguish a "proper" termination from a failure case (by checking the error cause). Throwing an exception in the source will fail the program.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With