Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merge two avro schemas programmatically

Tags:

java

avro

I have two similar schemas where only one nested field changes (it is called onefield in schema1 and anotherfield in schema2).

schema1

{
    "type": "record",
    "name": "event",
    "namespace": "foo",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "event",
                "namespace": "foo.metadata",
                "fields": [
                    {
                        "name": "onefield",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    }
                ]
            },
            "default": null
        }
    ]
}

schema2

{
    "type": "record",
    "name": "event",
    "namespace": "foo",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "event",
                "namespace": "foo.metadata",
                "fields": [
                    {
                        "name": "anotherfield",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    }
                ]
            },
            "default": null
        }
    ]
}

I am able to programatically merge both schemas using avro 1.8.0:

Schema s1 = new Schema.Parser().parse(schema1);
Schema s2 = new Schema.Parser().parse(schema2);
Schema[] schemas = {s1, s2};

Schema mergedSchema = null;
for (Schema schema: schemas) {
    mergedSchema = AvroStorageUtils.mergeSchema(mergedSchema, schema);
}

and use it to convert an input json into an avro or json representation:

JsonAvroConverter converter = new JsonAvroConverter();
try {
    byte[] example = new String("{}").getBytes("UTF-8");
    byte[] avro = converter.convertToAvro(example, mergedSchema);
    byte[] json = converter.convertToJson(avro, mergedSchema);
    System.out.println(new String(json));
} catch (AvroConversionException e) {
    e.printStackTrace();
}

That code shows the expected output: {"metadata":{"onefield":null,"anotherfield":null}}. The issue is that I am not able to see the merged schema. If I do a simple System.out.println(mergedSchema) I get the following exception:

Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: merged schema (generated by AvroStorage).merged
    at org.apache.avro.Schema$Names.put(Schema.java:1127)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:561)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:689)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:715)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:700)
    at org.apache.avro.Schema.toString(Schema.java:323)
    at org.apache.avro.Schema.toString(Schema.java:313)
    at java.lang.String.valueOf(String.java:2982)
    at java.lang.StringBuilder.append(StringBuilder.java:131)

I call it the avro uncertainty principle :). It looks like avro is able to work with the merged schema, but it fails when it tries to serialize the schema to JSON. The merge works with simpler schemas, so it sounds like a bug in avro 1.8.0 to me.

Do you know what could be happening or how to solve it? Any workaround (ex: alternative Schema serializers) is welcome.

like image 973
Guido Avatar asked Apr 10 '16 11:04

Guido


1 Answers

I found the same issue with the pig util class... actually there are 2 bugs here

  • AVRO allows serialize data through GenericDatumWriter using an invalid schema
  • The piggybank util class is generating invalid schemas because it is using the same name/namespace for all the merged fields (instance of keep the original name)

This is working properly for more complex scenarios https://github.com/kite-sdk/kite/blob/master/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/SchemaUtil.java#L511

    Schema mergedSchema = SchemaUtil.merge(s1, s2);

From your example, I am getting the following output

{
  "type": "record",
  "name": "event",
  "namespace": "foo",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "event",
        "namespace": "foo.metadata",
        "fields": [
          {
            "name": "onefield",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "anotherfield",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ]
      },
      "default": null
    }
  ]
}

Hopefully this will help others.

like image 97
hlagos Avatar answered Oct 25 '22 23:10

hlagos