I am building a pipeline that reads Avro generic records. To pass GenericRecord between stages I need to register AvroCoder. The documentation says that if I use generic record, the schema argument can be arbitrary: https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of-java.lang.Class-org.apache.avro.Schema-
However, when I pass an empty schema to the method AvroCoder.of(Class, Schema)
it throws an exception at run time. Is there a way to create an AvroCoder for GenericRecord that does not require a schema? In my case, each GenericRecord has an embedded schema.
The exception and stacktrace:
Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)
A Coder<T> defines how to encode and decode values of type T into byte streams. Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization.
A Coder using Avro binary format. Each instance of AvroCoder<T> encapsulates an Avro schema for objects of type T . The Avro schema may be provided explicitly via of(Class, Schema) or omitted via of(Class) , in which case it will be inferred using Avro's ReflectData .
I had a similar case and solved it with custom coder. The simplest (but sub-efficient) solution would be to encode schema along with each record. If your schemas are not too volatile you can get benefit of caching.
public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
public static GenericRecordCoder of() {
return new GenericRecordCoder();
}
private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();
@Override
public void encode(GenericRecord value, OutputStream outStream) throws IOException {
String schemaString = value.getSchema().toString();
String schemaHash = getHash(schemaString);
StringUtf8Coder.of().encode(schemaString, outStream);
StringUtf8Coder.of().encode(schemaHash, outStream);
AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash,
s -> AvroCoder.of(value.getSchema()));
coder.encode(value, outStream);
}
@Override
public GenericRecord decode(InputStream inStream) throws IOException {
String schemaString = StringUtf8Coder.of().decode(inStream);
String schemaHash = StringUtf8Coder.of().decode(inStream);
AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash,
s -> AvroCoder.of(new Schema.Parser().parse(schemaString)));
return coder.decode(inStream);
}
}
While this solves the task, in fact I made it slightly different, using external schema registry (you can build this on the top of datastore for example). In this case you don't need to serialize/deserialize schema. The code looks like:
public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
public static GenericRecordCoder of() {
return new GenericRecordCoder();
}
private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();
@Override
public void encode(GenericRecord value, OutputStream outStream) throws IOException {
SchemaRegistry.registerIfAbsent(value.getSchema());
String schemaName = value.getSchema().getFullName();
StringUtf8Coder.of().encode(schemaName, outStream);
AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName,
s -> AvroCoder.of(value.getSchema()));
coder.encode(value, outStream);
}
@Override
public GenericRecord decode(InputStream inStream) throws IOException {
String schemaName = StringUtf8Coder.of().decode(inStream);
AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName,
s -> AvroCoder.of(SchemaRegistry.get(schemaName)));
return coder.decode(inStream);
}
}
The usage is pretty straightforward:
PCollection<GenericRecord> inputCollection = pipeline
.apply(AvroIO
.parseGenericRecords(t -> t)
.withCoder(GenericRecordCoder.of())
.from(...));
After reviewing the code for AvroCoder
, I do not think the documentation is correct there. Your AvroCoder
instance will need a way to figure out the schema for your Avro records - and likely the only way to do that is by providing one.
So, I'd recommend calling AvroCoder.of(GenericRecord.class, schema)
, where schema
is the correct schema for the GenericRecord
objects in your PCollection.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With