I am trying to listen newly created topic with the below code, but is not working. Can you please tell me if the below code is correct?
public class KafkaMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
private final ProcessEventModel eventModel;
@KafkaListener(topicPattern = "betsyncDataTopic*")
public void receive(ConsumerRecord<String, String> consumerRecord) {
LOGGER.info("received payload at '{}'", consumerRecord.timestamp());
eventModel.process(consumerRecord.value());
}
Add the “ Spring for Apache Kafka ” dependency to your Spring Boot project. Step 2: Create a Configuration file named KafkaConfig. Below is the code for the KafkaConfig.java file. Step 4: Now we have to do the following things in order to consume messages from Kafka topics with Spring Boot
@KafkaListener(topics = "reflectoring-others") @SendTo("reflectoring-1") String listenAndReply(String message) { LOG.info("ListenAndReply [ {}]", message); return "This is a reply sent after receiving message"; } } The Spring Boot default configuration gives us a reply template.
Since version 2.5, Spring for Apache Kafka provides ToStringSerializer and ParseStringDeserializer classes that use String representation of entities. They rely on methods toString and some Function<String> or BiFunction<String, Headers> to parse the String and populate properties of an instance.
There is also another way to run retry logic with new versions of Spring Kafka. With the config above, kafka consumer retries 3 times in case of error and publishes the message to “topicName + .DLT” topic.
Your regex is not valid; it should be betsyncDataTopic.*
.
@KafkaListener(id = "xxx", topicPattern = "kbgh.*")
public void listen(String in) {
System.out.println(in);
}
...
partitions assigned: [kbgh290-0]
EDIT
If you later add new topics that match the pattern, there will be a delay before the rebalance. According to the KafkaConsumer
javadocs...
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topic existing at the time of check.
* <p>
* As part of group management, the consumer will keep track of the list of consumers that
* belong to a particular group and will trigger a rebalance operation if one of the
* following events trigger -
* <ul>
* <li>Number of partitions change for any of the subscribed list of topics
* <li>Topic is created or deleted
* <li>An existing member of the consumer group dies
* <li>A new member is added to an existing consumer group via the join API
* </ul>
I just ran a test; added a new matching topic at 12:13:32
; result:
2018-02-12 12:17:30.394 INFO 88028 --- [ xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions revoked: [kbgh290-0]
2018-02-12 12:17:30.450 INFO 88028 --- [ xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [kbgh290-0, kbghNew-0]
So it takes 5 minutes, by default.
https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With