Right now I have a simple Kafka Consumer and Producer implemented in My SpringBoot application , which works fine what I wanna do next is that my consumer takes the consumed message and directly broadcasts it to all subscribed clients. I figured out that i can not use STOMP Messaging with WebFlux, so how can I accomplish this task , I saw the reactive WebSocket implementation but I did not figure out how I could send my consumed data to my websocket.
That is my simple KafkaProducer:
fun addMessage(message: Message){
val headers : MutableMap<String, Any> = HashMap()
headers[KafkaHeaders.TOPIC] = topicName
kafkaTemplate.send(GenericMessage<Message>(message, headers))
}
And my simple Consumer looks like this:
@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
fun receiveData(message:Message) :Message{
//Take consumed data and send to websocket
}
I would consider to have a Sinks.many().multicast().onBackpressureBuffer()
as global intermediate container. Then in your receiveData()
you just sink data into that Reactor abstraction.
For your WebSocket connected sessions I would suggest to implement a org.springframework.web.reactive.socket.WebSocketHandler
and use Sinks.Many.asFlux()
in the WebSocketSession.send(Publisher<WebSocketMessage> messages)
API. This way all your sessions are going to consume the same Kafka data as long as they are connected to this WebSocket server.
See more info in docs: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-websockethandler
UPDATE
You can find some sample here: https://github.com/artembilan/sandbox/tree/master/so-65667450
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With