Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Enabling @KafkaListener to take in variable topic names from application.yml file

I am attempting to load in multiple topics to a single @KafkaListener but am running into trouble as I believe it is looking for a constant value, but initializing the topics variable from the application.yml file causing something issues, I was wondering if someone could help me troubleshoot this issue, or provide me with direction into how to load multiple Kafka topics into a single KafkaListener.

I am able to listen to multiple topics in the same @KafkaListener by passing them in a comma delimited object as seen below:

@KafkaListener(topics = {
           "flight-events",
           "flight-time-events",
           "service-events",
           "flight-delay-events"
   })

I realize I could do an object with comma delimited values representing the topics, but I want to be able to add topics through a config file, rather than changing code in the code base.

I believe there may be the problem in that @KafkaListener needs to take in a constant value, and I am unable to define an annotation as a constant, is there any way around this?

KafkaWebSocketConnector.java

@Component
public class KafkaWebSocketConnector
{


   @Value("${spring.kafka.topics}")
   private String[] topics;

   @KafkaListener(topics = topics)
   public void listen(ConsumerRecord<?, Map<String, String>> message)
   {
      log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
      String dest = "/" + message.topic();
      log.info("destination = {}", dest);
      log.info("msg: {}", message);
      messageTemplate.convertAndSend(dest, message.value());
   }
}

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: kafka-websocket-connector
    topics: flight-events,
      flight-time-events,
      canceled-events,
      pax-events,
      flight-delay-events
like image 702
terrabl Avatar asked Jul 05 '17 19:07

terrabl


People also ask

How do you pass topics dynamically to a Kafka listener?

You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.

What does the @KafkaListener annotation do?

The @KafkaListener is an annotation that marks a method or class as the target of incoming messages. By using @KafkaListener, you abstract away the need to configure the underlying KafkaMessageListenerContainer.

Is Kafka listener thread safe?

The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException .


1 Answers

Answer provided from @Gary Russell from this GitHub issue:

https://github.com/spring-projects/spring-kafka/issues/361

You can use a SpEL expression; there's an example in EnableKafkaIntegrationTests...

@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")

In my case "#{'${spring.kafka.topics}'.split(',')}"

I was able to implement the above code, (provided by Gary Russell) in order to answer the above question.

like image 159
terrabl Avatar answered Oct 12 '22 13:10

terrabl