Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting pojos to generic records in confluent.io to send through a KafkaProducer

I am completely new to Kafka and avro and trying to use the confluent package. We have existing POJOs we use for JPA and I'd like to be able to simply produce an instance of my POJOs without having to reflect each value into a generic record manually. I seem to be missing how this is done in the documentation.

The examples use a generic record and set each value one by one like so:

String key = "key1";
String userSchema = "{\"type\":\"record\"," +
                    "\"name\":\"myrecord\"," +
                    "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
  producer.send(record);
} catch(SerializationException e) {
  // may need to do something with it
}

There are several examples for getting a schema from a class and I found the annotations to alter that schema as necessary. Now how do I take an instance of a POJO and just send it to the serializer as is and have the library do the work of matching up the schema from the class and then copying the values into a generic record? Am I going about this all wrong? What I want to end up doing is something like this:

String key = "key1";
Schema schema = ReflectData.get().getSchema(myObject.getClass());
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema);

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
  producer.send(record);
} catch(SerializationException e) {
  // may need to do something with it
}

Thanks!

like image 222
Joe Avatar asked Jan 05 '16 18:01

Joe


People also ask

What is generic record?

Interface GenericRecordA generic instance of a record schema. Fields are accessible by name as well as by index.

What is TopicNameStrategy?

TopicNameStrategy. Derives subject name from topic name. (This is the default.) RecordNameStrategy. Derives subject name from record name, and provides a way to group logically related events that may have different data structures under a subject.


1 Answers

I wound up creating my own serializer in this instance:

public class KafkaAvroReflectionSerializer extends KafkaAvroSerializer {
   private final EncoderFactory encoderFactory = EncoderFactory.get();

   @Override
   protected byte[] serializeImpl(String subject, Object object) throws SerializationException {
      //TODO: consider caching schemas
      Schema schema = null;

      if(object == null) {
         return null;
      } else {
         try {
            schema = ReflectData.get().getSchema(object.getClass());
            int e = this.schemaRegistry.register(subject, schema);
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            out.write(0);
            out.write(ByteBuffer.allocate(4).putInt(e).array());

            BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
            DatumWriter<Object> writer = new ReflectDatumWriter<>(schema);
            writer.write(object, encoder);
            encoder.flush();
            out.close();

            byte[] bytes = out.toByteArray();
            return bytes;
         } catch (IOException ioe) {
            throw new SerializationException("Error serializing Avro message", ioe);
         } catch (RestClientException rce) {
            throw new SerializationException("Error registering Avro schema: " + schema, rce);
         } catch (RuntimeException re) {
            throw new SerializationException("Error serializing Avro message", re);
         }
      }
   }
}
like image 180
Joe Avatar answered Oct 17 '22 18:10

Joe