Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Failed to start bean kafkaListenerContainer: java.lang.IllegalArgumentException

I am using this as example for reading file using spring integration, its working fine but when I try to send file to kafka producer it doesn't work. I tried to look up this issue on internet but could not find help. Here is my code:

file: MessageProcessingIntegrationFlow.java:

@Bean
public IntegrationFlow writeToFile() {
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
       .transform(m -> new StringBuilder((String)m).toString().toUpperCase())
//                .handle(fileWritingMessageHandler)
    .handle(loggingHandler())
    .handle(kafkaProducerMessageHandler())
    .get();
}



 //producing channel
@Bean(name="kafkaChannel")
public DirectChannel kafkaChannel() {
    return new DirectChannel();
}

@Bean
public DirectChannel consumingChannel() {
  return new DirectChannel();
}


    @Bean
@ServiceActivator(inputChannel = "kafkaChannel")
public MessageHandler kafkaProducerMessageHandler() {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression(kafkaTopic));
    handler.setMessageKeyExpression(new LiteralExpression("kafka-integration"));
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> properties = new HashMap<>();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  // introduce a delay on the send to allow more messages to accumulate
  properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

  return properties;
}

//consumer configuration....
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
  KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
      new KafkaMessageDrivenChannelAdapter<>(kafkaListenerContainer());
  kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel());
  return kafkaMessageDrivenChannelAdapter;
}

@SuppressWarnings("unchecked")
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
  ContainerProperties containerProps = new ContainerProperties(kafkaTopic); //set topic name
  return (ConcurrentMessageListenerContainer<String, String>) new ConcurrentMessageListenerContainer<>(
      consumerFactory(), containerProps);
}

@Bean
public ConsumerFactory<?, ?> consumerFactory() {
  return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> properties = new HashMap<>();
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
  // automatically reset the offset to the earliest offset
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

  return properties;
}

Here is stacktrace:

 org.springframework.context.ApplicationContextException: Failed to start bean 'kafkaListenerContainer'; nested exception is java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at com.porterhead.Application.main(Application.java:25) [classes/:na]
 Caused by: java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided
at org.springframework.util.Assert.isTrue(Assert.java:92) ~[spring-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:199) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
... 12 common frames omitted


     

I don't know what I am doing wrong. Please do let me know if you need more details on this one. Thanks.

like image 754
mansoor67890 Avatar asked Dec 04 '25 17:12

mansoor67890


1 Answers

The KafkaProducerMessageHandler is one-way component, it doesn’t produce reply. It just publishes to Kafka topic and does nothing more. Therefore you can’t continue flow after it like you do with your handle(loggingHandler()). The KafkaProducerMessageHandler must be the last endpoint in the flow. Unlike with the FileWritingMessageHandler which is a AbstractReplyProducingMessageHandler and continue the flow.

Nevertheless consider in the future to properly describe the problem: what is expected and what is wrong. The answer was my best guess because I know the code of all those components.

like image 142
Artem Bilan Avatar answered Dec 07 '25 15:12

Artem Bilan