Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to define a LogicalType in Avro. (java)

Tags:

java

avro

I need to be able to mark some fields in the AVRO schema so that they will be encrypted at serialization time.

A logicalType allows to mark the fields, and together with a custom conversion should allow to let them be encrypted transparently by AVRO.


I had some issues to find documentation on how to define and use a new logicalType in AVRO (avro_1.8.2#Logical+Types).
I decided then to share here in the answer what I found, to easy the life of anyone else getting on it and to get some feedback in case I'm doing something wrong.

like image 850
enrico Avatar asked Feb 28 '18 16:02

enrico


People also ask

What is Logicaltype in Avro schema?

Logical types specify a way of representing a high-level type as a base Avro type. For example, a date is specified as the number of days after the unix epoch (or before using a negative value). This enables extensions to Avro's type system without breaking binary compatibility.

What is Avro in Java?

Apache Avro™ is the leading serialization format for record data, and first choice for streaming data pipelines. It offers excellent schema evolution, and has implementations for the JVM (Java, Kotlin, Scala, …), Python, C/C++/C#, PHP, Ruby, Rust, JavaScript, and even Perl.

What is Avro conversion?

Data serialization is a technique of converting data into binary or text format. There are multiple systems available for this purpose. Apache Avro is one of those data serialization systems. Avro is a language independent, schema-based data serialization library.


1 Answers

First of all I defined a logicalType as:

public class EncryptedLogicalType extends LogicalType {
    //The key to use as a reference to the type
    public static final String ENCRYPTED_LOGICAL_TYPE_NAME = "encrypted";

    EncryptedLogicalType() {
        super(ENCRYPTED_LOGICAL_TYPE_NAME);
    }

    @Override
    public void validate(Schema schema) {
        super.validate(schema);
        if (schema.getType() != Schema.Type.BYTES) {
            throw new IllegalArgumentException(
                    "Logical type 'encrypted' must be backed by bytes");
        }
    }
}

Then a new conversion:

public class EncryptedConversion extends Conversion<ByteBuffer> {
    // Construct a unique instance for all the conversion. This have to be changed in case the conversion
    //   needs some runtime information (e.g.: an encryption key / a tenant_ID). If so, the get() method should 
    //   return the appropriate conversion per key.
    private static final EncryptedConversion INSTANCE = new EncryptedConversion();
    public static final EncryptedConversion get(){ return INSTANCE; }
    private EncryptedConversion(){ super(); }

    //This conversion operates on ByteBuffer and returns ByteBuffer
    @Override
    public Class<ByteBuffer> getConvertedType() { return ByteBuffer.class; }

    @Override
    public String getLogicalTypeName() { return EncryptedLogicalType.ENCRYPTED_LOGICAL_TYPE_NAME; }

    // fromBytes and toBytes have to be overridden as this conversion works on bytes. Other may need to be 
    //  overridden. The types supported need to be updated also in EncryptedLogicalType#validate(Schema schema)
    @Override
    public ByteBuffer fromBytes(ByteBuffer value, Schema schema, LogicalType type) {
        encryptedValue = __encryptionLogic__(value); 
        return encryptedValue;
    }

    @Override
    public ByteBuffer toBytes(ByteBuffer value, Schema schema, LogicalType type) {
        decryptedValue = __decryptionLogic__(value); 
        return decryptedValue;
    }
}

The .avsc schema file will be similar to:

{
    "name": “MyMessageWithEncryptedField”,
    "type": "record",
    "fields": [
        {"name": "payload","type" : {"type" : "bytes","logicalType" : "encrypted"}},
        ...

Finally in the MyMessageWithEncryptedField.java class generated out of the schema file I added the method to return the conversion:

@Override
public Conversion<?> getConversion(int fieldIndex) {
    // This allow us to have a more flexible conversion retrieval, so we don't have to code it per field.
    Schema fieldSchema = SCHEMA$.getFields().get(fieldIndex).schema();
    if ((fieldSchema.getLogicalType() != null)
            && (fieldSchema.getLogicalType().getName() == EncryptedLogicalType.ENCRYPTED_LOGICAL_TYPE_NAME)){
     // here we could pass to the get() method a runtime information, e.g.: a tenantId that can be found in the data structure.
        return EncryptedConversion.get();
    }
    return null;
}

To make it run I still have to register the type at runtime:

LogicalTypes.register(EncryptedLogicalType.ENCRYPTED_LOGICAL_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
    private final LogicalType encryptedLogicalType = new EncryptedLogicalType();
    @Override
    public LogicalType fromSchema(Schema schema) {
        return encryptedLogicalType;
    }
});

Few notes:

  • if your logicalType needs some other properties passed in from the schema definition, you can modify the LogicalType class taking example from avro.lang.java.avro.src.main.java.org.apache.avro.LogicalTypes.Decimal
  • the last piece of code (the register) is currently run before my logic starts, but I plan to move it in a static block inside the schema generated class (MyMessageWithEncryptedField.java)
like image 155
enrico Avatar answered Oct 06 '22 12:10

enrico