I'm trying to consume Kafka messages in avro format but I'm not able to decode the messages from avro to json in Go.
I'm using the Confluent platform (3.0.1). For example I produce avro messages like:
kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}
Now I consume messages with the go Kafka libary: sarama. Plain text message are working fine. Avro message have to be decoded. I found different libs: github.com/linkedin/goavro, github.com/elodina/go-avro
But after decoding I get a json without values (both libs):
{"f1":""}
goavro:
avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))
go-avro:
schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())
msg = sarama.ConsumerMessage
The first byte is a magic byte (0). The following 4 bytes are the avro schema ID
Which is only really useful if you use the Confluent schema registry.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With