I have implemented the Kafka consumer, now I have a scenario.
To do the above things, I need to pause/resume the Kafka consumer using a scheduling job(already written) using quartz, which copies data from table 1 to table 2. But during this activity, I want my Kafka listener to pause, and once the copy is done, it should resume.
My implementation:
@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "data_pipe", partitions = { "0" })})
public void listen(ConsumerRecord<String, String> cr) throws Exception {
if you use 'kafkaListener annotation' auto created KafkaListenerEndpointRegistry bean, so, You can use it like this code:
@Component
public class KafkaManager {
private final KafkaListenerEndpointRegistry registry;
public KafkaManager(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
document : https://docs.spring.io/spring-kafka/reference/html/#pause-resume
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With