Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Send Custom Java Objects to Kafka Topic

I have my custom Java Object and wish to leverage JVM's in built serialization to send it to a Kafka topic, but serialization fails with below error

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.spring.kafka.Payload to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer

Payload.java

public class Payload implements Serializable {

    private static final long serialVersionUID = 123L;

    private String name="vinod";

    private int anInt = 5;

    private Double aDouble = new Double("5.0");

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAnInt() {
        return anInt;
    }

    public void setAnInt(int anInt) {
        this.anInt = anInt;
    }

    public Double getaDouble() {
        return aDouble;
    }

    public void setaDouble(Double aDouble) {
        this.aDouble = aDouble;
    }

}

During my creation of producer, I have the following properties set

<entry key="key.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />
                <entry key="value.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />

My send invoke is as below

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));

What is correct way to send a custom java object through a producer to a kafka topic ?

like image 276
Vinod Jayachandran Avatar asked Dec 14 '16 11:12

Vinod Jayachandran


People also ask

How do I push data into Kafka topic?

Step1: Start the zookeeper as well as the kafka server. Step2: Type the command: 'kafka-console-producer' on the command line. This will help the user to read the data from the standard inputs and write it to the Kafka topic.

How do you serialize an object in Kafka?

In order to serialize our own objects, we'll implement the Serializer interface. Similarly, to create a custom deserializer, we'll implement the Deserializer interface. There are there methods available to override for both interfaces: configure: used to implement configuration details.


1 Answers

We have 2 Options as listed below

1) If we intend to send custom java objects to producer, We need to create a serializer which implements org.apache.kafka.common.serialization.Serializer and pass that Serializer class during creation of your producer

Code Reference below

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) {

    }

    public byte[] serialize(String s, Object o) {

       try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) {
            return new byte[0];
        }
    }

    public void close() {

    }
}

And set the value serializer accordingly

<entry key="value.serializer"
                       value="com.spring.kafka.PayloadSerializer" />

2) No need to create custom serializer class. Use the existing ByteArraySerializer, but during send follow the process

Java Object -> String (Preferrably JSON represenation instead of toString)->byteArray

like image 67
Vinod Jayachandran Avatar answered Sep 25 '22 07:09

Vinod Jayachandran