I have a chat application with spring reactive webflux. When a new message is created, all subscribers get that message(this is just an example for simplify my problem). Everything is right and worked perfectly. But, now I need an event for subscribers when a new message arrived.
This is my code:
Controller:
@Autowired
@Qualifier("ptpReplayProcessor")
private ReplayProcessor<String> ptpReplayProcessor;
@GetMapping(value = "/chat/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> subscribe() {
return Flux.from(ptpReplayProcessor);
}
ReplayProcessorConfig:
@Configuration
public class ReplayProcessorConfig {
@Bean("ptpReplayProcessor")
ReplayProcessor<String> ptpReplayProcessor() {
ReplayProcessor<String> replayProcessor = ReplayProcessor.create(0, false);
replayProcessor.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnNext(String ptp) {
System.out.println("replay processor called!");
}
});
return replayProcessor;
}
}
pom.xml
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
When a new message created, I call ptpReplayProcessor.onNext(message)
. This works correctly and all clients receive message, but phrase replay processor called!
just print from sender of message. I want to raise an event for all clients when receiving a new message. Also I tried ReplayProcessor.doOnNext()
and ReplayProcessor.doOnEach()
methods, but not worked. Is there any way for doing this?
you can do this on your subscribe method:
return Flux.from(ptpReplayProcessor).doOnNext(s -> System.out.println("new message has been sent"));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With