This portion of my pipeline is supposed to take an input, apply the appropriate tuple tag to it, and then do further processing on the input based on the the tag it receives.
When running the code below, the PCollection from main tag (tag1) works properly. However, the additional tags (tag2, tag3) will throw this error on the .apply()
:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Assign Output.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.
Why does this error occur on tag2 but not on tag1? Note, if i make tag2 the main output and tag1/tag3 the additional outputs and reorder the code appropriately, tag2 processing is successful, but tag1/tag3 will throw the error.
Main Pipeline:
PCollectionTuple pct = outputPair.apply("Assign Output", ParDo.of( new output())
.withOutputTags(output.tag1, TupleTagList.of(output.tag2).and(output.tag3)));
//Tag1 Output
PCollection<KV<String, outResultPair>> tagPair1 = pct.get(output.tag1)
.apply("Process", ParDo.of( new ABCOutput()))
//Tag2 Output
PCollection<KV<String, outResultPair>> tagPair2 = pct.get(output.tag2)
.apply("Process", ParDo.of( new DEFOutput())) //Error Thrown here
Supporting Classes:
//ABCOutput Class
@DefaultCoder(AvroCoder.class)
public class ABCOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
c.output( processInput(e) );
}
}
//XYZOutput Class
@DefaultCoder(AvroCoder.class)
public class XYZOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
c.output( processInput(e) );
}
}
//Output Splitter
@DefaultCoder(AvroCoder.class)
public class output {
private final static Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();
final static TupleTag<KV<String,inResultPair>> tag2 = new TupleTag();
final static TupleTag<KV<String,inResultPair>> tag3 = new TupleTag();
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
KV<String, outResultPair> out = process(e);
switch(e.getValue().type){
case 1:
c.output(tag1, out);
break;
case 2:
c.output(tag2, out);
break;
case 3:
c.output(tag3, out);
break;
}
c.output();
}
}
You need to construct the TupleTag
's in a way that their type information will be preserved by the Java compiler, whereas currently you're constructing them as raw types, so Beam's coder inference doesn't know what type are the elements output into this tag.
Change:
final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();
to:
final static TupleTag<KV<String,inResultPair>> tag1 =
new TupleTag<KV<String, inResultPair>>() {};
The {}
is critically important for preserving type information here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With