I am using my own class in a Kafka message which has a bunch of String data types.
I therefore cannot use the default serializer class or the StringSerializer
that comes with Kafka library.
I guess I need to write my own serializer and feed it to the producer properties?
EDIT
In newer Kafka Clients, implement Serializer
rather than Encoder
.
The things required for writing a custom serializer are:
Encoder
with an object specified for the generic VerifiableProperties
constructor is requiredtoBytes(...)
method making sure a byte array is returnedProducerConfig
As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfig
instance and that instance is used to construct the desired Producer
class.
If you follow Kafka's Producer Example you will construct ProducerConfig
via a Properties
object. When building your properties file be sure to include:
props.put("serializer.class", "path.to.your.CustomSerializer");
With the path to the class you want Kafka to use to serialize messages before appending them to the log.
Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T]
scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:
public class JsonEncoder implements Encoder<Object> { private static final Logger logger = Logger.getLogger(JsonEncoder.class); // instantiating ObjectMapper is expensive. In real life, prefer injecting the value. private static final ObjectMapper objectMapper = new ObjectMapper(); public JsonEncoder(VerifiableProperties verifiableProperties) { /* This constructor must be present for successful compile. */ } @Override public byte[] toBytes(Object object) { try { return objectMapper.writeValueAsString(object).getBytes(); } catch (JsonProcessingException e) { logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e); } return "".getBytes(); } }
Your question makes it sound like you are using one object (lets call it CustomMessage
) for all messages appended to your log. If that's the case, your serializer could look more like this:
package com.project.serializer; public class CustomMessageEncoder implements Encoder<CustomMessage> { public CustomMessageEncoder(VerifiableProperties verifiableProperties) { /* This constructor must be present for successful compile. */ } @Override public byte[] toBytes(CustomMessage customMessage) { return customMessage.toBytes(); } }
Which would leave your property config to look like this:
props.put("serializer.class", "path.to.your.CustomSerializer");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With