Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring WebFlux with Kafka and Websockets

Tags:

Right now I have a simple Kafka Consumer and Producer implemented in My SpringBoot application , which works fine what I wanna do next is that my consumer takes the consumed message and directly broadcasts it to all subscribed clients. I figured out that i can not use STOMP Messaging with WebFlux, so how can I accomplish this task , I saw the reactive WebSocket implementation but I did not figure out how I could send my consumed data to my websocket.

That is my simple KafkaProducer:

fun addMessage(message: Message){
        val headers : MutableMap<String, Any> = HashMap()
        headers[KafkaHeaders.TOPIC] = topicName
        kafkaTemplate.send(GenericMessage<Message>(message, headers))
    }

And my simple Consumer looks like this:

@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
    fun receiveData(message:Message) :Message{
        //Take consumed data and send to websocket
    }
like image 771
Yasin Eraslan Avatar asked Jan 11 '21 13:01

Yasin Eraslan


1 Answers

I would consider to have a Sinks.many().multicast().onBackpressureBuffer() as global intermediate container. Then in your receiveData() you just sink data into that Reactor abstraction.

For your WebSocket connected sessions I would suggest to implement a org.springframework.web.reactive.socket.WebSocketHandler and use Sinks.Many.asFlux() in the WebSocketSession.send(Publisher<WebSocketMessage> messages) API. This way all your sessions are going to consume the same Kafka data as long as they are connected to this WebSocket server.

See more info in docs: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-websockethandler

UPDATE

You can find some sample here: https://github.com/artembilan/sandbox/tree/master/so-65667450

like image 152
Artem Bilan Avatar answered Oct 14 '22 01:10

Artem Bilan