I'm writing a Kafka consumer. I need to pass the environment variable topic name to @KafkaListener(topics = ...)
. This is what I have tried so far:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@Autowired
private EnvProperties envProperties;
private final String topic = envProperties.getTopic();
@KafkaListener(topics = "#{'${envProperties.getTopic()}'}", groupId = "group_id")
public void consume(String message) {
logger.info("Consuming messages " +envProperties.getTopic());
}
}
I'm getting an error at the line topics = "#{'${envProperties.getTopic()}'}"
, the application fails to start.
How to set this topic name dynamically from the environment variable?
Yes, Kafka's design allows consumers from one consumer group to consume messages from multiple topics. The protocol underlying consumer. poll() allows sending requests for multiple partitions(across topics as well) in one request.
ConcurrentKafkaListenerContainerFactory is from the spring framework and can be used in the spring ecosystem. KafkaConsumer is from Apache's Java sdk for Kafka. Both are just different tools/apis to implement Kafka consumers on Java. Just might differ in the capabilities provided.
To dynamically create topics, you need to use an AdminClient . Spring Boot auto-configures a KafkaAdmin bean. You can create an AdminClient using its properties. Then use the client to create your topic(s).
Normally, you can't reference fields or properties from the bean in which the SpEL is declared. However, @KafkaListener
has special syntax to support it.
See the documentation.
Starting with version 2.1.2, the SpEL expressions support a special token
__listener
which is a pseudo bean name which represents the current bean instance within which this annotation exists.
So, if you add public EnvProperties getEnvProperties()
to the class then something like
#{__listener.envProperties.topic}
should work.
In KafkaConsumer class, you need to make below changes :
@Autowired
public EnvProperties envProperties;
@KafkaListener(topics = "#{kafkaConsumer.envProperties.getTopic()}"
It worked for me.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With