Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Problems adding multiple KafkaListenerContainerFactories

Hi I'm currently dabbling in Spring Kafka and succeeded in adding a single KafkaListenerContainerFactory to my listener. Now I'd like to add multiple KafkaListenerContainerFactorys (One for a topic that will have messages in json, another one for strings). See code below:

@EnableKafka
@Configuration
public class KafkaConsumersConfig {

    private final KafkaConfiguration kafkaConfiguration;

    @Autowired
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
        this.kafkaConfiguration = kafkaConfiguration;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,Record> jsonConsumerFactory(){
        JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
        return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public Map<String,Object> jsonConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
    @Bean
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(fileConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,String> fileConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
    }

    @Bean
    public Map<String,Object> fileConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
}

Running this gives me the following error:

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

What am I doing wrong?

like image 489
Omar Andres Olivares Rodriguez Avatar asked Mar 31 '17 13:03

Omar Andres Olivares Rodriguez


3 Answers

Looks like you are not going to rely on the Spring Boot's Kafka Auto Configuration.

Spring Boot provides in the KafkaAutoConfiguration:

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {

Since you have jsonConsumerFactory and fileConsumerFactory, they override that one provided by the auto-config.

But on the other hand, in the KafkaAnnotationDrivenConfiguration, non of your factories can be applied:

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

Because your ConsumerFactory beans are not of ConsumerFactory<Object, Object> type.

So:

  • Just exclude KafkaAutoConfiguration from the Spring Boot auto configuration by adding the following to the application properties file: spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
  • or rename one of your KafkaListenerContainerFactory beans to the kafkaListenerContainerFactory to override it in the Boot
  • or make one of the ConsumerFactory beans as a ConsumerFactory<Object, Object> type.
like image 170
Artem Bilan Avatar answered Nov 20 '22 20:11

Artem Bilan


I have achieved it below code and its working fine for me.

// LISTENER 1
@Bean
@ConditionalOnMissingBean(name = "yourListenerFactory1")
public ConsumerFactory<String, YourCustomObject1> yourConsumerFactory1() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-1");
   return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
     new JsonDeserializer<>(YourCustomObject1.class));
}


@Bean(name = "yourListenerFactory1")
public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> 
  yourListenerFactory1() {
   ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> factory =
       new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(yourConsumerFactory1());
   ContainerProperties containerProperties = factory.getContainerProperties();
   containerProperties.setPollTimeout(...);
   containerProperties.setAckMode(AckMode...);
   return factory;
}


// LISTENER 2
@Bean
@ConditionalOnMissingBean(name = "yourListenerFactory2")
public ConsumerFactory<String, YourCustomObject2> yourConsumerFactory2() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-2");
   return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
      new JsonDeserializer<>(YourCustomObject2.class));
}


@Bean(name = "yourListenerFactory2")
public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> 
   yourListenerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> factory 
         =  new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(yourConsumerFactory2());
    ContainerProperties containerProperties = factory.getContainerProperties();
    containerProperties.setPollTimeout(...);
    containerProperties.setAckMode(AckMode...);
    return factory;
 }

Also, I have set spring.autoconfigure.exclude property as ITS MUST spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

This is my Consumer config

Consumer 1

@KafkaListener(id = "your-cousumer-1",
  topicPattern = "your-topic-1",
  containerFactory = "yourListenerFactory1")
 public void consumer1(YourCustomObject1 data,
                       Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ... }

Consumer 2

  @KafkaListener(id = "your-cousumer-2",
                 topicPattern = "your-topic-2",
                 containerFactory = "yourListenerFactory2")
  public void consumer2(YourCustomObject2 data,
                        Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ...  }

Also, my kafka template was

@Autowired
KafkaTemplate<String, Object> kafkaTemplate;
like image 44
Hitesh Kumar Avatar answered Nov 20 '22 19:11

Hitesh Kumar


You can define each container factory in KafkaListener definition as follow:

@KafkaListener(topics = "fileTopic", containerFactory = "kafkaFileListenerContainerFactory")
public void fileConsumer(...) {...}

@KafkaListener(topics = "jsonTopic", containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonConsumer(...) {...}
like image 1
ayortanli Avatar answered Nov 20 '22 19:11

ayortanli