Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ClassNotFoundException with Kafka consumer

I have a Kafka consumer application, written with Spring Boot 2.0.2. When I receive the message in my listener, I get the following error:

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.test.demo.domain.Account]; nested exception is java.lang.ClassNotFoundException: com.test.demo.domain.Account

The class name of the object in the Producer is "com.test.demo.domain.Account" but I have a different package and class name in the consumer.

When I re-package the consumer's class name to match the producers, everything works ok. However, I believe I shouldn't have to do this.

Does anyone know the issue?

==== UPDATED ====

My Producer code:

@Bean public ProducerFactory<String, Account> accountProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
    return new DefaultKafkaProducerFactory<>(configProps); }

@Bean public KafkaTemplate<String, Account> accountKafkaTemplate() {
    ProducerFactory<String, Account> factory = accountProducerFactory();

    return new KafkaTemplate<>(factory); }

Consumer code:

public ConsumerFactory<String, Account> accountConsumerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
    return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Account> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(accountConsumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

Exception:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.kconsumer.accountconsumer.service.AccountConsumer.accountListener(com.kconsumer.accountconsumer.domain.Account)]
Bean [com.kconsumer.accountconsumer.service.AccountConsumer@444cc791]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.kconsumer.accountconsumer.domain.Account] for GenericMessage [payload={"id":"5b079d0b340d9ef2ac9b6f02","name":"test-400","version":0}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6a0a6b0b, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=5b079d0b340d9ef2ac9b6f02, kafka_receivedPartitionId=0, kafka_receivedTopic=ktest, kafka_receivedTimestamp=1527225611820, __TypeId__=[B@2f92e17a}], failedMessage=GenericMessage [payload={"id":"5b079d0b340d9ef2ac9b6f02","name":"test-400","version":0}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6a0a6b0b, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=5b079d0b340d9ef2ac9b6f02, kafka_receivedPartitionId=0, kafka_receivedTopic=ktest, kafka_receivedTimestamp=1527225611820, __TypeId__=[B@2f92e17a}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:257)
like image 774
Joe P Avatar asked May 23 '18 00:05

Joe P


2 Answers

If you are using @KafkaListener, use the StringDeserializer or a ByteArrayDserializer and add a StringJsonMessageConverter @Bean to the application context.

Then...

@KafkaListener(...)
public void listen(Account account) {
    ...
}

...the required type of the account is passed to the converter.

See the documentation.

EDIT

You don't need a connection factory, boot will detect the converter and wire it in for you.

Here's an example:

@SpringBootApplication
public class So50478267Application {

    public static void main(String[] args) {
        SpringApplication.run(So50478267Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Account1> template) {
        return args -> template.send("so50478267", new Account1("foo.inc"));
    }

    @KafkaListener(id = "listen", topics = "so50478267")
    public void listen(Account2 account) {
        System.out.println(account);
    }

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50478267", 1, (short) 1);
    }

    public static class Account1 {

        private final String customer;

        public Account1(String customer) {
            this.customer = customer;
        }

        public String getCustomer() {
            return this.customer;
        }

    }

    public static class Account2 {

        private String customer;

        public Account2() {
            super();
        }

        public String getCustomer() {
            return this.customer;
        }

        public void setCustomer(String customer) {
            this.customer = customer;
        }

        @Override
        public String toString() {
            return "Account2 [customer=" + this.customer + "]";
        }

    }

}

and

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
like image 130
Gary Russell Avatar answered Sep 20 '22 11:09

Gary Russell


How did you deserialize your Kafka record object ? If you use deserializers from spring-kafka API such as StringDeserializer, JsonDeserializer.. I think it's not possible to change record object's package name.

If you write custom Serdes for Kafka record yourself then you can do it in deserialization logic by overriding method readClassDescriptor() of ObjectInputStream. Please find the sample code here.

like image 25
Quang Vien Avatar answered Sep 18 '22 11:09

Quang Vien