Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring reactive - event on receiveing new message

I have a chat application with spring reactive webflux. When a new message is created, all subscribers get that message(this is just an example for simplify my problem). Everything is right and worked perfectly. But, now I need an event for subscribers when a new message arrived.

This is my code:

Controller:

@Autowired
@Qualifier("ptpReplayProcessor")
private ReplayProcessor<String> ptpReplayProcessor;

@GetMapping(value = "/chat/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> subscribe() {
    return Flux.from(ptpReplayProcessor);
}

ReplayProcessorConfig:

@Configuration
public class ReplayProcessorConfig {
    @Bean("ptpReplayProcessor")
    ReplayProcessor<String> ptpReplayProcessor() {
        ReplayProcessor<String> replayProcessor = ReplayProcessor.create(0, false);
        replayProcessor.subscribe(new BaseSubscriber<String>() {
            @Override
            protected void hookOnNext(String ptp) {
                System.out.println("replay processor called!");
            }

        });
        return replayProcessor;
    }
}

pom.xml

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

When a new message created, I call ptpReplayProcessor.onNext(message). This works correctly and all clients receive message, but phrase replay processor called! just print from sender of message. I want to raise an event for all clients when receiving a new message. Also I tried ReplayProcessor.doOnNext() and ReplayProcessor.doOnEach() methods, but not worked. Is there any way for doing this?

like image 210
hamed Avatar asked Oct 27 '22 17:10

hamed


1 Answers

you can do this on your subscribe method:

return Flux.from(ptpReplayProcessor).doOnNext(s -> System.out.println("new message has been sent"));
like image 56
Nonika Avatar answered Nov 15 '22 06:11

Nonika