I have a Kafka consumer application, written with Spring Boot 2.0.2. When I receive the message in my listener, I get the following error:
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.test.demo.domain.Account]; nested exception is java.lang.ClassNotFoundException: com.test.demo.domain.Account
The class name of the object in the Producer is "com.test.demo.domain.Account" but I have a different package and class name in the consumer.
When I re-package the consumer's class name to match the producers, everything works ok. However, I believe I shouldn't have to do this.
Does anyone know the issue?
==== UPDATED ====
My Producer code:
@Bean public ProducerFactory<String, Account> accountProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public KafkaTemplate<String, Account> accountKafkaTemplate() {
ProducerFactory<String, Account> factory = accountProducerFactory();
return new KafkaTemplate<>(factory); }
Consumer code:
public ConsumerFactory<String, Account> accountConsumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Account> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(accountConsumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
Exception:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.kconsumer.accountconsumer.service.AccountConsumer.accountListener(com.kconsumer.accountconsumer.domain.Account)]
Bean [com.kconsumer.accountconsumer.service.AccountConsumer@444cc791]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.kconsumer.accountconsumer.domain.Account] for GenericMessage [payload={"id":"5b079d0b340d9ef2ac9b6f02","name":"test-400","version":0}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6a0a6b0b, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=5b079d0b340d9ef2ac9b6f02, kafka_receivedPartitionId=0, kafka_receivedTopic=ktest, kafka_receivedTimestamp=1527225611820, __TypeId__=[B@2f92e17a}], failedMessage=GenericMessage [payload={"id":"5b079d0b340d9ef2ac9b6f02","name":"test-400","version":0}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6a0a6b0b, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=5b079d0b340d9ef2ac9b6f02, kafka_receivedPartitionId=0, kafka_receivedTopic=ktest, kafka_receivedTimestamp=1527225611820, __TypeId__=[B@2f92e17a}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:257)
If you are using @KafkaListener
, use the StringDeserializer
or a ByteArrayDserializer
and add a StringJsonMessageConverter
@Bean
to the application context.
Then...
@KafkaListener(...)
public void listen(Account account) {
...
}
...the required type of the account is passed to the converter.
See the documentation.
EDIT
You don't need a connection factory, boot will detect the converter and wire it in for you.
Here's an example:
@SpringBootApplication
public class So50478267Application {
public static void main(String[] args) {
SpringApplication.run(So50478267Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Account1> template) {
return args -> template.send("so50478267", new Account1("foo.inc"));
}
@KafkaListener(id = "listen", topics = "so50478267")
public void listen(Account2 account) {
System.out.println(account);
}
@Bean
public StringJsonMessageConverter jsonConverter() {
return new StringJsonMessageConverter();
}
@Bean
public NewTopic topic() {
return new NewTopic("so50478267", 1, (short) 1);
}
public static class Account1 {
private final String customer;
public Account1(String customer) {
this.customer = customer;
}
public String getCustomer() {
return this.customer;
}
}
public static class Account2 {
private String customer;
public Account2() {
super();
}
public String getCustomer() {
return this.customer;
}
public void setCustomer(String customer) {
this.customer = customer;
}
@Override
public String toString() {
return "Account2 [customer=" + this.customer + "]";
}
}
}
and
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
How did you deserialize your Kafka record object ? If you use deserializers from spring-kafka API such as StringDeserializer, JsonDeserializer.. I think it's not possible to change record object's package name.
If you write custom Serdes for Kafka record yourself then you can do it in deserialization logic by overriding method readClassDescriptor() of ObjectInputStream. Please find the sample code here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With