Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Camel Kafka Integration

I recently noticed that Camel now has its own component for Kafka so I decided to give it a whirl.

I decided to try a nice simple file -> kafka topic as follows...

<route>
        <from uri="file:///tmp/input" />
        <setHeader headerName="kafka.PARTITION_KEY">
            <constant>Test</constant>
        </setHeader>
        <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1" />
</route>

This seems simple enough, however, on running this I get...

    java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
    at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78)

And the on inspecting the Camel code, it does the following...

String msg = exchange.getIn().getBody(String.class);
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg);
producer.send(data);

Obviously, this is a serialisation problem, I'm just not sure if there is a workaround or this is inherently a bug with the existing implementation? (Or hopefully just my misunderstanding)

Any suggestions? Thanks, J

like image 262
JayTee Avatar asked Sep 08 '14 16:09

JayTee


1 Answers

Ah, never mind here we go... Hope this helps someone else, you have to set the serialiser in the options.

<route>
            <from uri="file:///tmp/input" />
            <setHeader headerName="kafka.PARTITION_KEY">
                <constant>Test</constant>
            </setHeader>
            <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder" />
</route>
like image 94
JayTee Avatar answered Oct 05 '22 06:10

JayTee