Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to dynamically add elements to reactor hot flux from another method?

I have a data source service, which takes an observer as a parameter.

void subscribe(Consumer onEventConsumer);

I want to use flux as a response stream for RSocket. How can I do this? As I see it now, it should be something like

Flux<T> controllerMethod(RequestMessage mgs) {
   var flux = Flux.empty();
   dataSource.subscribe(event -> flux.push(event));
   return flux;
}

But I have big doubts that it's a proper solution, and I'm new in the reactive approach, I don't know what methods I should use here?

like image 671
Mr.Ustiik Avatar asked Oct 20 '25 16:10

Mr.Ustiik


1 Answers

As Simon already pointed out, this is what you use Flux.create for.

Take a look at the Getting Started Guide on projectreactor.io.

In short, you register a custom listener inside the lambda of the create method:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

What you want to do is to pass the incoming elements on to a FluxSink, which will then publish those elements on the Flux.

like image 111
kerner1000 Avatar answered Oct 23 '25 04:10

kerner1000



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!