Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Serialization of an object [duplicate]

I started playing with Kafka. I've set an a zookeeper configuration, and I managed to send and consume String messages. Now I am trying to pass an Object (in java), but from some reason, when parsing the Message in the consumer I have header issues. I tried several serialization options (using Decoder/Encoder), and all of the return the same header issue.

Here is my code The producer:

        Properties props = new Properties();
        props.put("zk.connect", "localhost:2181");
        props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer");
        ProducerConfig config = new ProducerConfig(props);
        Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config);
        ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails());
        try {
           producer.send(data);
        } finally {
           producer.close();
        }

And the consumer:

        Properties props = new Properties();
        props.put("zk.connect", "localhost:2181");
        props.put("zk.connectiontimeout.ms", "1000000");
        props.put("groupid", "test_group");

        // Create the connection to the cluster
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
        Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams =
                consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer());
        List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3");

        // create list of 4 threads to consume from each of the partitions
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // consume the messages in the threads
        for (final KafkaMessageStream<EventDetails> stream: streams) {
            executor.submit(new Runnable() {
                public void run() {
                    for(EventDetails event: stream) {
                        System.err.println("********** Got message" + event.toString());        
                    }
                }
            });
        }

and my Serializer:

public  class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> {
    public Message toMessage(EventDetails eventDetails) {
        try {
            ObjectMapper mapper = new ObjectMapper(new SmileFactory());
            byte[] serialized = mapper.writeValueAsBytes(eventDetails);
            return new Message(serialized);
} catch (IOException e) {
            e.printStackTrace();
            return null;   // TODO
        }
}
    public EventDetails toEvent(Message message) {
        EventDetails event = new EventDetails();

        ObjectMapper mapper = new ObjectMapper(new SmileFactory());
        try {
            //TODO handle error
            return mapper.readValue(message.payload().array(), EventDetails.class);
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }

    }
}

And this is the error I get:

org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse
 at [Source: N/A; line: -1, column: -1]

When I worked with MessagePack and with plain writing to a ObjectOutputStream I got a similiar header issue. I also tried to add the payload CRC32 to the message, but that didn't help as well.

What am I doing wrong here?

like image 784
krakover Avatar asked Jul 15 '12 14:07

krakover


2 Answers

Hm, I haven't run into the same header issue that you are encountering but my project wasn't compiling correctly when I didn't provide a VerifiableProperties constructor in my encoder/decoder. It seems strange that the missing constructor would corrupt Jackson's deserialization though.

Perhaps try splitting up your encoder and decoder and include the VerifiableProperties constructor in both; you shouldn't need to implement Decoder[T] for serialization. I was able to successfully implement json de/serialization using ObjectMapper following the format in this post.

Good luck!

like image 140
Sam Berry Avatar answered Sep 30 '22 14:09

Sam Berry


Bytebuffers .array() method is not very reliable. It depends on the particular implementation. You might want to try

ByteBuffer bb = message.payload()

byte[] b = new byte[bb.remaining()]
bb.get(b, 0, b.length);
return mapper.readValue(b, EventDetails.class) 
like image 33
questionersam Avatar answered Sep 30 '22 12:09

questionersam