I want to send a Kafka message with a payload of a class that extends SpecificRecordBase; it is a class that has been generated with the help of a maven plugin.
One of the fields of my schema has a type of timestamp-millis, which corresponds to the java.time.Instant in the generated class.
The field is defined as follows:
{"name": "processingTime", "type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
When I create an instance of this class and set the processing time,
setProcessingTime(RandomDate.randomInstant())
everything is ok, but when I run the program and try sending it to Kafka, I get the following error:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class poc.avroGenerated.AvroMeasurement to class poc.avroSerde.AvroSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class java.time.Instant cannot be cast to class java.lang.Long (java.time.Instant and java.lang.Long are in module java.base of loader 'bootstrap')
Here's my custom serializer class:
@Override
public byte[] serialize(String topic, T data) {
byte[] result = null;
try {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
} catch (IOException e) {
LOGGER.error(e);
}
return result;
}
Use SpecificDatumWriter
instead of GenericDatumWriter
.
Drop in that one change, and your custom serializer looks fine!
This is frequently a point of confusion. In the Java implementation, "generic" datum do not take into account any customizations that were built into a specific record, including logical type conversions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With