Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spring Kafka listening to regex

I am trying to listen newly created topic with the below code, but is not working. Can you please tell me if the below code is correct?

public class KafkaMessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);

    private final ProcessEventModel eventModel;

    @KafkaListener(topicPattern = "betsyncDataTopic*")
    public void receive(ConsumerRecord<String, String> consumerRecord) {
        LOGGER.info("received payload at '{}'", consumerRecord.timestamp());
        eventModel.process(consumerRecord.value());
    }
like image 528
krmanish007 Avatar asked Feb 12 '18 14:02

krmanish007


People also ask

How to consume messages from Kafka topics with Spring Boot?

Add the “ Spring for Apache Kafka ” dependency to your Spring Boot project. Step 2: Create a Configuration file named KafkaConfig. Below is the code for the KafkaConfig.java file. Step 4: Now we have to do the following things in order to consume messages from Kafka topics with Spring Boot

How do I send a reply to a Kafka listener?

@KafkaListener(topics = "reflectoring-others") @SendTo("reflectoring-1") String listenAndReply(String message) { LOG.info("ListenAndReply [ {}]", message); return "This is a reply sent after receiving message"; } } The Spring Boot default configuration gives us a reply template.

How do I parse a string in spring for Kafka?

Since version 2.5, Spring for Apache Kafka provides ToStringSerializer and ParseStringDeserializer classes that use String representation of entities. They rely on methods toString and some Function<String> or BiFunction<String, Headers> to parse the String and populate properties of an instance.

How to run retry logic with new versions of Kafka?

There is also another way to run retry logic with new versions of Spring Kafka. With the config above, kafka consumer retries 3 times in case of error and publishes the message to “topicName + .DLT” topic.


1 Answers

Your regex is not valid; it should be betsyncDataTopic.*.

@KafkaListener(id = "xxx", topicPattern = "kbgh.*")
public void listen(String in) {
    System.out.println(in);
}

...

partitions assigned: [kbgh290-0]

EDIT

If you later add new topics that match the pattern, there will be a delay before the rebalance. According to the KafkaConsumer javadocs...

 * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
 * The pattern matching will be done periodically against topic existing at the time of check.
 * <p>
 * As part of group management, the consumer will keep track of the list of consumers that
 * belong to a particular group and will trigger a rebalance operation if one of the
 * following events trigger -
 * <ul>
 * <li>Number of partitions change for any of the subscribed list of topics
 * <li>Topic is created or deleted
 * <li>An existing member of the consumer group dies
 * <li>A new member is added to an existing consumer group via the join API
 * </ul>

I just ran a test; added a new matching topic at 12:13:32; result:

2018-02-12 12:17:30.394  INFO 88028 --- [      xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
: partitions revoked: [kbgh290-0]
2018-02-12 12:17:30.450  INFO 88028 --- [      xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
: partitions assigned: [kbgh290-0, kbghNew-0]

So it takes 5 minutes, by default.

https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

like image 55
Gary Russell Avatar answered Oct 08 '22 23:10

Gary Russell