Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KafkaException: Could not instantiate class JsonDeserializer

I'm trying to start Spring Boot application with Kafka client following this reference guide and I'm getting the error below.
Could you please advise how to fix?

@Bean
public Map<String, Object> consumerConfig() {
    final HashMap<String, Object> result = new HashMap<>();
    result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap.servers"));
    result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return result;
}

@Bean
public ConsumerFactory<Long, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
}

@Bean
ConcurrentKafkaListenerContainerFactory<Long, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Long, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory());
    containerFactory.setConcurrency(3);
    containerFactory.getContainerProperties().setPollTimeout(3000);
    return containerFactory;
}

--

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:545) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at com.ubs.wma.bmss.BmssConsumerApp.main(BmssConsumerApp.java:12) [classes/:na]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553) ~[kafka-clients-0.10.1.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:305) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:230) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:180) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:124) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    ... 12 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.springframework.kafka.support.serializer.JsonDeserializer
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:316) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:203) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) ~[kafka-clients-0.10.1.1.jar:na]
    ... 24 common frames omitted
Caused by: java.lang.IllegalAccessException: Class org.apache.kafka.common.utils.Utils can not access a member of class org.springframework.kafka.support.serializer.JsonDeserializer with modifiers "protected"
    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) ~[na:1.8.0_121]
    at java.lang.Class.newInstance(Class.java:436) ~[na:1.8.0_121]
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:314) ~[kafka-clients-0.10.1.1.jar:na]
    ... 26 common frames omitted
like image 879
Mike Avatar asked Jan 05 '23 11:01

Mike


1 Answers

According to that documentation we have:

for more complex or particular cases, the KafkaConsumer, and therefore KafkaProducer, provides overloaded constructors to accept (De)Serializer instances for keys and/or values, respectively.

To meet this API, the DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory also provide properties to allow to inject a custom (De)Serializer to target Producer/Consumer.

And further Apache Kafka JavaDocs:

/**
 * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
 * Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
 * Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
 * either the string "42" or the integer 42).
 * @param configs   The producer configs
 * @param keySerializer  The serializer for key that implements {@link Serializer}. The configure() method won't be
 *                       called in the producer when the serializer is passed in directly.
 * @param valueSerializer  The serializer for value that implements {@link Serializer}. The configure() method won't
 *                         be called in the producer when the serializer is passed in directly.
 */
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {

So, what you need is like this:

@Bean
public ConsumerFactory<Long, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig(), null, new JsonDeserializer(Foo.class));
}

The problem that JsonDeserializer can't be instantiated by the reflection because it needs particular type to deserialize to.

like image 119
Artem Bilan Avatar answered Jan 07 '23 01:01

Artem Bilan