I am attempting to load in multiple topics to a single @KafkaListener
but am running into trouble as I believe it is looking for a constant value, but initializing the topics
variable from the application.yml
file causing something issues, I was wondering if someone could help me troubleshoot this issue, or provide me with direction into how to load multiple Kafka topics into a single KafkaListener.
I am able to listen to multiple topics in the same @KafkaListener
by passing them in a comma delimited object as seen below:
@KafkaListener(topics = {
"flight-events",
"flight-time-events",
"service-events",
"flight-delay-events"
})
I realize I could do an object with comma delimited values representing the topics, but I want to be able to add topics through a config file, rather than changing code in the code base.
I believe there may be the problem in that @KafkaListener needs to take in a constant value, and I am unable to define an annotation as a constant, is there any way around this?
KafkaWebSocketConnector.java
@Component
public class KafkaWebSocketConnector
{
@Value("${spring.kafka.topics}")
private String[] topics;
@KafkaListener(topics = topics)
public void listen(ConsumerRecord<?, Map<String, String>> message)
{
log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
String dest = "/" + message.topic();
log.info("destination = {}", dest);
log.info("msg: {}", message);
messageTemplate.convertAndSend(dest, message.value());
}
}
application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: kafka-websocket-connector
topics: flight-events,
flight-time-events,
canceled-events,
pax-events,
flight-delay-events
You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.
The @KafkaListener is an annotation that marks a method or class as the target of incoming messages. By using @KafkaListener, you abstract away the need to configure the underlying KafkaMessageListenerContainer.
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException .
Answer provided from @Gary Russell from this GitHub issue:
https://github.com/spring-projects/spring-kafka/issues/361
You can use a SpEL expression; there's an example in EnableKafkaIntegrationTests...
@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
In my case "#{'${spring.kafka.topics}'.split(',')}"
I was able to implement the above code, (provided by Gary Russell) in order to answer the above question.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With