Whenever I am trying to read the message from kafka queue, I am getting following exception :
[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Kafka Producer Code:
public class AvroSpecificProducer {
private static Properties kafkaProps = new Properties();
private static KafkaProducer<String, Customer> kafkaProducer;
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProducer = new KafkaProducer<>(kafkaProps);
}
public static void fireAndForget(ProducerRecord<String, Customer> record) {
kafkaProducer.send(record);
}
public static void asyncSend(ProducerRecord<String, Customer> record) {
kafkaProducer.send(record, (recordMetaData, ex) -> {
System.out.println("Offset: "+ recordMetaData.offset());
System.out.println("Topic: "+ recordMetaData.topic());
System.out.println("Partition: "+ recordMetaData.partition());
System.out.println("Timestamp: "+ recordMetaData.timestamp());
});
}
public static void main(String[] args) throws InterruptedException, IOException {
Customer customer1 = new Customer(1002, "Jimmy");
ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry",
"Customer One 11 ", customer1
);
asyncSend(record1);
Thread.sleep(1000);
}
}
Kafka Consumer Code:
public class AvroSpecificDeserializer {
private static Properties kafkaProps = new Properties();
static {
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("zookeeper.connect", "localhost:2181");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
}
public static void infiniteConsumer() throws IOException {
VerifiableProperties properties = new VerifiableProperties(kafkaProps);
KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties);
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("NewTopic", 1);
ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream stream = consumerMap.get("NewTopic").get(0);
ConsumerIterator it = stream.iterator();
System.out.println("???????????????????????????????????????????????? ");
while (it.hasNext()) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
MessageAndMetadata messageAndMetadata = it.next();
String key = (String) messageAndMetadata.key();
GenericRecord record = (GenericRecord) messageAndMetadata.message();
Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
System.out.println("Key: " + key);
System.out.println("Value: " + customer);
}
}
public static void main(String[] args) throws IOException {
infiniteConsumer();
}
}
I am following, These examples:
This is the final code that would work, after discussing with @harmeen
static {
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("zookeeper.connect", "localhost:2181");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
}
public static void infiniteConsumer() throws IOException {
VerifiableProperties properties = new VerifiableProperties(kafkaProps);
StringDecoder keyDecoder = new StringDecoder(properties);
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("BrandNewTopics", 1);
ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
Map<String, List<KafkaStream<String, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream stream = consumerMap.get("BrandNewTopics").get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata messageAndMetadata = it.next();
String key = (String) messageAndMetadata.key();
GenericRecord record = (GenericRecord) messageAndMetadata.message();
Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
System.out.println("Key: " + key);
System.out.println("Value: " + customer);
}
Things that got change:
SPECIFIC_AVRO_READER_CONFIG
property to true.StringSerializer
and StringDeserializer
for keys.Customer
class that represents the Avro record.I did get the issue. The root cause is Avro deserializer expect the class also should be in the same package as serializer. For example if in your producer you used a.b.c.AvroSerializer and a.b.c.AvroDeserializer in your consumer you should use the same package structure. Just keeping the same package structure resolved the issue for me.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With