I have a Flink job, which I am integration testing using the approach described here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing
The job takes its input from two sources, which are combined in a CoFlatMapFuntion
. In the test environment, I'm currently using two simple SourceFunctions to emit the values, however this doesn't provide any control over the order in which the events are emitted. This is necessary in order to properly test the functionality of the job.
How can I modify my test to ensure one source function emits all of its data before the second?
I've seen the approach suggested in Integration test for complex topology (multiple inputs) in Flink, this is fine for unit testing but I'm looking for a solution which allows me to integration test the entire job.
I would suggest to add control code to your two SourceFunctions
and using the MiniClusterWithClientResource
. It could look the following way:
public class JobITCase {
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS = 2;
private static final int PARALLELISM = NUM_SLOTS * NUM_TMS;
@ClassRule
public final static MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(NUM_SLOTS)
.setNumberTaskManagers(NUM_TMS)
.build());
@Test
public void testJob() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
final MyControllableSourceFunction source1 = new MyControllableSourceFunction("source1");
final MyControllableSourceFunction source2 = new MyControllableSourceFunction("source2");
final DataStreamSource<Integer> input1 = env.addSource(source1);
final DataStreamSource<Integer> input2 = env.addSource(source2);
input1.connect(input2).map(new CoMapFunction<Integer, Integer, Integer>() {
@Override
public Integer map1(Integer integer) {
System.out.println("Input 1: " + integer);
return integer;
}
@Override
public Integer map2(Integer integer) {
System.out.println("Input 2: " + integer);
return integer;
}
}).print();
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().submitJob(jobGraph).get();
final CompletableFuture<JobResult> jobResultFuture = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().requestJobResult(jobGraph.getJobID());
final ArrayList<CompletableFuture<Void>> finishedFutures = new ArrayList<>(PARALLELISM);
for (int i = 0; i < PARALLELISM; i++) {
MyControllableSourceFunction.startExecution(source1, i);
finishedFutures.add(MyControllableSourceFunction.getFinishedFuture(source1, i));
}
FutureUtils.waitForAll(finishedFutures).join();
for (int i = 0; i < PARALLELISM; i++) {
MyControllableSourceFunction.startExecution(source2, i);
}
jobResultFuture.join();
}
private static class MyControllableSourceFunction extends RichParallelSourceFunction<Integer> {
private static final ConcurrentMap<String, CountDownLatch> startLatches = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, CompletableFuture<Void>> finishedFutures = new ConcurrentHashMap<>();
private final String name;
private boolean running = true;
private MyControllableSourceFunction(String name) {
this.name = name;
}
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
final int index = getRuntimeContext().getIndexOfThisSubtask();
final CountDownLatch startLatch = startLatches.computeIfAbsent(getId(index), ignored -> new CountDownLatch(1));
final CompletableFuture<Void> finishedFuture = finishedFutures.computeIfAbsent(getId(index), ignored -> new CompletableFuture<>());
startLatch.await();
int counter = 0;
while (running && counter < 10) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(counter++);
}
}
finishedFuture.complete(null);
}
@Override
public void cancel() {
running = false;
}
private String getId(int index) {
return name + '_' + index;
}
static void startExecution(MyControllableSourceFunction source, int index) {
final CountDownLatch startLatch = startLatches.computeIfAbsent(source.getId(index), ignored -> new CountDownLatch(1));
startLatch.countDown();
}
static CompletableFuture<Void> getFinishedFuture(MyControllableSourceFunction source, int index) {
return finishedFutures.computeIfAbsent(source.getId(index), ignored -> new CompletableFuture<>());
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With