Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass dynamic topic name to @KafkaListener(topics) from environment variable

I'm writing a Kafka consumer. I need to pass the environment variable topic name to @KafkaListener(topics = ...). This is what I have tried so far:

 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener; 
 import org.springframework.stereotype.Service;

 @Service
 public class KafkaConsumer {

     @Autowired
     private EnvProperties envProperties;

     private final String topic = envProperties.getTopic();

     @KafkaListener(topics = "#{'${envProperties.getTopic()}'}", groupId = "group_id")
     public void consume(String message) {
        logger.info("Consuming messages " +envProperties.getTopic());
     }
}

I'm getting an error at the line topics = "#{'${envProperties.getTopic()}'}", the application fails to start.

How to set this topic name dynamically from the environment variable?

like image 231
CodeCool Avatar asked Jan 04 '19 12:01

CodeCool


People also ask

How do I listen multiple topics in Kafka?

Yes, Kafka's design allows consumers from one consumer group to consume messages from multiple topics. The protocol underlying consumer. poll() allows sending requests for multiple partitions(across topics as well) in one request.

What is ConcurrentKafkaListenerContainerFactory?

ConcurrentKafkaListenerContainerFactory is from the spring framework and can be used in the spring ecosystem. KafkaConsumer is from Apache's Java sdk for Kafka. Both are just different tools/apis to implement Kafka consumers on Java. Just might differ in the capabilities provided.

How do I create a Kafka spring boot topic?

To dynamically create topics, you need to use an AdminClient . Spring Boot auto-configures a KafkaAdmin bean. You can create an AdminClient using its properties. Then use the client to create your topic(s).


2 Answers

Normally, you can't reference fields or properties from the bean in which the SpEL is declared. However, @KafkaListener has special syntax to support it.

See the documentation.

Starting with version 2.1.2, the SpEL expressions support a special token __listener which is a pseudo bean name which represents the current bean instance within which this annotation exists.

So, if you add public EnvProperties getEnvProperties() to the class then something like

#{__listener.envProperties.topic}

should work.

like image 178
Gary Russell Avatar answered Oct 17 '22 05:10

Gary Russell


In KafkaConsumer class, you need to make below changes :

@Autowired
public EnvProperties envProperties;

@KafkaListener(topics = "#{kafkaConsumer.envProperties.getTopic()}"

It worked for me.

like image 29
Himi-Hash Avatar answered Oct 17 '22 03:10

Himi-Hash