In Flink, when we have two or more operators which are side outputing the same data type of records, can we reuse the OutputTag that data output data type?
Example:
OutputTag<A> sideOutputTag = new OutputTag<A>("side-output") {};
ProcessFunction1 processFunction1 = new ProcessFunction1(sideOutputTag);
ProcessFunction2 processFunction2 = new ProcessFunction2(sideOutputTag);
SingleOutputStreamOperator<A> output1 = input.process(processFunction1).getSideOutput(sideOutputTag);
SingleOutputStreamOperator<A> output2 = input.process(processFunction2).getSideOutput(sideOutputTag);
In this approach, will output1
contains the outputs processed by processFunction2
?
Or, will output1
and output2
contain the records processed by processFunction1
and processFunction2
separately?
Thanks!
You can reuse the same tag, and the resulting streams will be distinct. For example:
final OutputTag<String> errors = new OutputTag<String>("errors"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With