We are using the Dataflow Java SDK and we have an increasing number of custom key classes that are almost the same.
I would like to have them extend a common abstract class however the Dataflow SDK seems to try to instantiate the abstract class causing an InstantiationException.
Caused by: java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316)
at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.decode(AvroCoder.java:242)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:97)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:156)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:139)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:133)
at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:108)
at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:45)
at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1218)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
at com.telstra.cdf.rmr.model.pardos.ParDoAbstractCampaignUAKeyExtractor.processElement(ParDoAbstractCampaignUAKeyExtractor.java:5
here is our abstract class,
@DefaultCoder(AvroCoder.class)
public abstract class SuperClassKey {
public SuperClassKey(){}
public abstract double getSomeValue();
}
and this is the sub class
@DefaultCoder(AvroCoder.class)
public class SubClassKey extends SuperClassKey {
public String foo;
public SubClassKey() {
}
public SubClassKey(String foo){
this.foo = foo;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubClassKey that = (SubClassKey) o;
if (!foo.equals(that.foo)) return false;
return true;
}
@Override
public int hashCode() {
return foo.hashCode();
}
@Override
public double getSomeValue() {
return foo;
}
}
I have also tried using an interface without success.
Is it possible to have a common abstract class or interface between Keys?
The issue is likely from using a PCollection<SuperClassKey> instead of PCollection<SubClassKey>. The PCollection needs to be typed with a concrete class. The coder can be explicitly specified with .setCoder(AvroCoder.of(SubClassKey.class)) if type inference is not sufficient.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With