Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Integrating Spring Boot and Reactor-Kafka's KafkaReceiver

I am trying to develop a Spring Boot application using the library reactor-kafka to react to some messages read from a Kafka topic.

I have a configuration class that builds a KafkaReceiver.

@Configuration
public class MyConfiguration {

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        Map<String, Object> props = new HashMap<>();
        // Options initialisation...
        final ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions.<String, string>create(props)
                               .subscription(Collections.singleton(consumer.getTopic()));
        return KafkaReceiver.create(receiverOptions);
    } 
}

Well...and now? Using not-so-reactive spring-kafka library, I can annotate a method with @KafkaListener and Spring Boot will create for me a thread listening from a Kafka topic.

Where should I place the KafkaReceiver, instead? In all the examples I found the use directly the main method, but this is not the Boot way.

I am using Spring Boot 2.1.3 and Reactor-Kafka 1.1.0

Thanks in advance.

like image 645
riccardo.cardin Avatar asked Mar 19 '19 13:03

riccardo.cardin


People also ask

How does Kafka integrate with Spring boot application?

React + Spring Boot Microservices and Spring In this chapter, we are going to see how to implement the Apache Kafka in Spring Boot application. First, we need to add the Spring Kafka dependency in our build configuration file. Maven users can add the following dependency in the pom. xml file.

What is Reactor Kafka?

Reactor Kafka is a reactive API for Apache Kafka based on Project Reactor. Reactor Kafka API enables messages to be published to Kafka topics and consumed from Kafka topics using functional APIs with non-blocking back-pressure and very low overheads.

Is Kafka streams reactive?

Overall Kafka is a great asynchronous messaging tool for use in reactive systems, but it is only a small part of the bigger picture that you must consider when designing and building a reactive system.


1 Answers

Since you have that KafkaReceiver bean, now you can do like this:

@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
        return args -> {
                kafkaReceiver.receive()
                          ...
                          .sunbscribe();
        };
}

This ApplicationRunner bean is going to be kicked when the ApplicationContext is ready. See its JavaDocs for more info.

like image 169
Artem Bilan Avatar answered Oct 21 '22 07:10

Artem Bilan