I am trying to use AvroCoder to serialise a custom type which is passed around in PCollections in my pipeline. The custom type has a generic field (which currently is a String) When I run the pipeline, I get the AvroTypeException like below probably due to the generic field. Is building and passing the AvroSchema for the object the only way to get around this?
Exception in thread "main" org.apache.avro.AvroTypeException: Unknown type: T
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:255)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514)
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:593)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:472)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:116)
I have also attached my registry code for reference.
pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
@Override
public Coder<?> create(List<? extends Coder<?>> componentCoders) {
return AvroCoder.of(GenericTypeClass.class);
}
@Override
public List<Object> getInstanceComponents(Object value) {
return Collections.singletonList(((GenericTypeClass<Object>) value).key);
}
});
You’ve done everything right as far as setting up the CoderFactory
, but Avro’s ReflectData
mechanism which AvroCoder
uses to automatically generate a schema does not work for generic types, as of this writing. This is tracked as issue AVRO-1571. See also this StackOverflow question.
In order to allow encoding of GenericTypeClass<T>
for some particular values of T
, you are correct that you will have to provide some explicit schema information. There are two ways to proceed:
The first approach is to provide an explicit schema on fields of type T
within your GenericTypeClass<T>
, like so:
class GenericTypeClass<T> {
// Avro requires a no-args constructor
public GenericTypeClass() {}
@AvroSchema("[\"string\", \"int\", ...]")
private T genericField;
}
The drawback is that it is limited to a finite, static union schema, and requires manually inlining the JSON schema for more complex values of T
.
The second approach is to provide an explicit schema when you build an AvroCoder
in your CoderFactory
, and provide this schema to AvroCoder.of(Class, Schema)
.
pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
@Override
public Coder<?> create(List<? extends Coder<?>> componentCoders) {
return AvroCoder.of(
GenericTypeClass.class
schemaFromCoder(componentCoders.get(0)));
}
...
});
This will mostly revolve around converting a Coder<T>
into a schema for T
. This should be easy for basic types and manageable for POJOs that ReflectData
supports. It does also provide a hook for ad hoc support of more difficult cases.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With