I recently noticed that Camel now has its own component for Kafka so I decided to give it a whirl.
I decided to try a nice simple file -> kafka topic as follows...
<route>
<from uri="file:///tmp/input" />
<setHeader headerName="kafka.PARTITION_KEY">
<constant>Test</constant>
</setHeader>
<to uri="kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1" />
</route>
This seems simple enough, however, on running this I get...
java.lang.ClassCastException: java.lang.String cannot be cast to [B
at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78)
And the on inspecting the Camel code, it does the following...
String msg = exchange.getIn().getBody(String.class);
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg);
producer.send(data);
Obviously, this is a serialisation problem, I'm just not sure if there is a workaround or this is inherently a bug with the existing implementation? (Or hopefully just my misunderstanding)
Any suggestions? Thanks, J
Ah, never mind here we go... Hope this helps someone else, you have to set the serialiser in the options.
<route>
<from uri="file:///tmp/input" />
<setHeader headerName="kafka.PARTITION_KEY">
<constant>Test</constant>
</setHeader>
<to uri="kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1&serializerClass=kafka.serializer.StringEncoder" />
</route>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With