Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka JsonDesirialization MessageConversionException failed to resolve class name Class not found

I have two services that should communicate via Kafka. Let's call the first service WriteService and the second service QueryService.

On the WriteService side, I have the following configuration for producers.

@Configuration
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

I am trying to send an object of the class com.example.project.web.routes.dto.RouteDto

On the QueryService side, the consumer configuration is defined as follows.

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.groupid}")
    private String serviceGroupId;

    @Value("${spring.kafka.consumer.trusted-packages}")
    private String trustedPackage;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

The listener has the following definition. The payload class has the fully qualified name - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto

@KafkaListener(topics = "${spring.kafka.topics.routes}",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
                               @Payload RouteDto payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }

When a message is sent, I get the following error

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.project.web.routes.dto.RouteDto]; nested exception is java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto

The error is clear enough. However I cannot understand why does it has this behaviour by default. I don't expect to have the same package in different services, that makes no sense at all.

I haven't found a way to disable this and use a class provided to the listener, annotated with @Payload

How this could be solved, without manually configuring the mapper?

like image 209
CROSP Avatar asked Feb 14 '19 12:02

CROSP


1 Answers

If you are using spring-kafka-2.2.x you can disable the default header by overloaded constructors of JsonDeserializer docs

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default). The following example shows how to do so:

DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

If with lower version use the MessageConverter (you might see this problem from spring-kafka-2.1.x and above)

Spring for Apache Kafka provides a MessageConverter abstraction with the MessagingMessageConverter implementation and its StringJsonMessageConverter and BytesJsonMessageConverter customization. You can inject the MessageConverter into a KafkaTemplate instance directly and by using AbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property. The following example shows how to do so:

@Bean
 public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
  }

  @KafkaListener(topics = "jsonData",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void jsonListener(RouteDto dto) {
     ...
   }

Note :This type inference can be achieved only when the @KafkaListener annotation is declared at the method level. With a class-level @KafkaListener, the payload type is used to select which @KafkaHandler method to invoke, so it must already have been converted before the method can be chosen.

like image 160
Deadpool Avatar answered Nov 03 '22 23:11

Deadpool