Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka and Avro: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer

Whenever I am trying to read the message from kafka queue, I am getting following exception :

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79)
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)

Kafka Producer Code:

public class AvroSpecificProducer {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer<String, Customer> kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1002, "Jimmy");
        ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry",
                "Customer One 11 ", customer1
        );

        asyncSend(record1);

        Thread.sleep(1000);
    }
}

Kafka Consumer Code:

public class AvroSpecificDeserializer {

    private static Properties kafkaProps = new Properties();

    static {
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
        kafkaProps.put("zookeeper.connect", "localhost:2181");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
    }

    public static void infiniteConsumer() throws IOException {
        VerifiableProperties properties = new VerifiableProperties(kafkaProps);
        KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties);
        KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put("NewTopic", 1);

        ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
        Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        KafkaStream stream = consumerMap.get("NewTopic").get(0);
        ConsumerIterator it = stream.iterator();

        System.out.println("???????????????????????????????????????????????? ");
        while (it.hasNext()) {
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
            MessageAndMetadata messageAndMetadata = it.next();
            String key = (String) messageAndMetadata.key();
            GenericRecord record = (GenericRecord) messageAndMetadata.message();
            Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
            System.out.println("Key: " + key);
            System.out.println("Value: " + customer);
        }

    }

    public static void main(String[] args) throws IOException {
        infiniteConsumer();
    }
}

I am following, These examples:

  1. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/java/io/confluent/examples/producer/AvroClicksProducer.java
  2. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java
like image 578
Harmeet Singh Taara Avatar asked Feb 13 '17 09:02

Harmeet Singh Taara


2 Answers

This is the final code that would work, after discussing with @harmeen

static { 
    kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); 
    kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
    kafkaProps.put("zookeeper.connect", "localhost:2181"); 
    kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
    kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
}

public static void infiniteConsumer() throws IOException { 

VerifiableProperties properties = new VerifiableProperties(kafkaProps); 
StringDecoder keyDecoder = new StringDecoder(properties); 
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties); 

Map<String, Integer> topicCountMap = new HashMap<>(); 
topicCountMap.put("BrandNewTopics", 1); 

ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps)); 
Map<String, List<KafkaStream<String, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); 

KafkaStream stream = consumerMap.get("BrandNewTopics").get(0); 
ConsumerIterator it = stream.iterator(); 

while (it.hasNext()) { 
    MessageAndMetadata messageAndMetadata = it.next(); 
    String key = (String) messageAndMetadata.key(); 
    GenericRecord record = (GenericRecord) messageAndMetadata.message(); 
    Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record); 
    System.out.println("Key: " + key); 
    System.out.println("Value: " + customer); 
} 

Things that got change:

  • Adding SPECIFIC_AVRO_READER_CONFIG property to true.
  • Using smallest to start from the beginning of the topic.
  • Using StringSerializer and StringDeserializer for keys.
  • Change both producer and consumer to reflect the previous change
  • Adjust the namespace for the Customer class that represents the Avro record.
like image 77
Javier Holguera Avatar answered Oct 10 '22 06:10

Javier Holguera


I did get the issue. The root cause is Avro deserializer expect the class also should be in the same package as serializer. For example if in your producer you used a.b.c.AvroSerializer and a.b.c.AvroDeserializer in your consumer you should use the same package structure. Just keeping the same package structure resolved the issue for me.

like image 38
Prakhya Avatar answered Oct 10 '22 06:10

Prakhya