Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Beam Coder for GenericRecord

I am building a pipeline that reads Avro generic records. To pass GenericRecord between stages I need to register AvroCoder. The documentation says that if I use generic record, the schema argument can be arbitrary: https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of-java.lang.Class-org.apache.avro.Schema-

However, when I pass an empty schema to the method AvroCoder.of(Class, Schema) it throws an exception at run time. Is there a way to create an AvroCoder for GenericRecord that does not require a schema? In my case, each GenericRecord has an embedded schema.

The exception and stacktrace:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)
like image 217
Nutel Avatar asked Dec 13 '18 14:12

Nutel


People also ask

What is Coder in Apache beam?

A Coder<T> defines how to encode and decode values of type T into byte streams. Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization.

What is AvroCoder?

A Coder using Avro binary format. Each instance of AvroCoder<T> encapsulates an Avro schema for objects of type T . The Avro schema may be provided explicitly via of(Class, Schema) or omitted via of(Class) , in which case it will be inferred using Avro's ReflectData .


2 Answers

I had a similar case and solved it with custom coder. The simplest (but sub-efficient) solution would be to encode schema along with each record. If your schemas are not too volatile you can get benefit of caching.

public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
    public static GenericRecordCoder of() {
        return new GenericRecordCoder();
    }
    private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();

    @Override
    public void encode(GenericRecord value, OutputStream outStream) throws IOException {
        String schemaString = value.getSchema().toString();
        String schemaHash = getHash(schemaString);
        StringUtf8Coder.of().encode(schemaString, outStream);
        StringUtf8Coder.of().encode(schemaHash, outStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash, 
            s -> AvroCoder.of(value.getSchema()));
        coder.encode(value, outStream);
    }

    @Override
    public GenericRecord decode(InputStream inStream) throws IOException {
        String schemaString = StringUtf8Coder.of().decode(inStream);
        String schemaHash = StringUtf8Coder.of().decode(inStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash, 
             s -> AvroCoder.of(new Schema.Parser().parse(schemaString)));
        return coder.decode(inStream);
    }
}

While this solves the task, in fact I made it slightly different, using external schema registry (you can build this on the top of datastore for example). In this case you don't need to serialize/deserialize schema. The code looks like:

public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
    public static GenericRecordCoder of() {
        return new GenericRecordCoder();
    }
    private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();

    @Override
    public void encode(GenericRecord value, OutputStream outStream) throws IOException {
        SchemaRegistry.registerIfAbsent(value.getSchema());
        String schemaName = value.getSchema().getFullName();
        StringUtf8Coder.of().encode(schemaName, outStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName, 
            s -> AvroCoder.of(value.getSchema()));
        coder.encode(value, outStream);
    }

    @Override
    public GenericRecord decode(InputStream inStream) throws IOException {
        String schemaName = StringUtf8Coder.of().decode(inStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName, 
             s -> AvroCoder.of(SchemaRegistry.get(schemaName)));
        return coder.decode(inStream);
    }
}

The usage is pretty straightforward:

PCollection<GenericRecord> inputCollection = pipeline
    .apply(AvroIO
           .parseGenericRecords(t -> t)
           .withCoder(GenericRecordCoder.of())
           .from(...));
like image 158
Valentin Avatar answered Oct 22 '22 20:10

Valentin


After reviewing the code for AvroCoder, I do not think the documentation is correct there. Your AvroCoder instance will need a way to figure out the schema for your Avro records - and likely the only way to do that is by providing one.

So, I'd recommend calling AvroCoder.of(GenericRecord.class, schema), where schema is the correct schema for the GenericRecord objects in your PCollection.

like image 1
Pablo Avatar answered Oct 22 '22 19:10

Pablo