Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create multiple Flux from aysnc callbacks

From Reactor's reference guide, I learnt that Flux.create() can be used to convert aysnc callback to Flux.

However, sometimes callbacks have multiple methods to receive multiple types of data, assume I have piece of code as below:

asrService.recognize(new Callback() {
    @Override
    public void stateChange(State state) {
        // consume state
    }

    @Override
    public void onResultData(Result result) {
        // consume result
    }
});

How to convert it to two reactive streams: Flux<State> and Flux<Result>?

like image 561
Liang Chen Avatar asked Sep 18 '25 04:09

Liang Chen


1 Answers

one way is to use Some processors Like DirectProcessor you can create 2 Different Processors and on event emit item to processor and subscribe the processor but if you still want to use Flux.create you can do it like this

    Flux<Object> objectFlux;

@Override
public void run(String... args) throws Exception {

    objectFlux = Flux.create(objectFluxSink ->
            asrService.recognize(new Callback() {
                @Override
                public void stateChange(State state) {
                    objectFluxSink.next(state);
                }

                @Override
                public void onResultData(Result result) {
                    objectFluxSink.next(state);
                }
            }));





}

public Flux<Result> getResult(){
 return    objectFlux.filter(o -> o instanceof Result)
            .map(o -> ((Result)o));
}

public Flux<State> geState(){
    return    objectFlux.filter(o -> o instanceof State)
            .map(o -> ((State)o));
}

i still think that using processor should be much more cleaner and you dont need to do that filter and casting but you need to have 2 Processors Like this :

        DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();
    DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();
    asrService.recognize(new Callback() {
        @Override
        public void stateChange(State state) {
            stateDirectProcessor.onNext(state);
        }

        @Override
        public void onResultData(Result result) {
            resultDirectProcessor.onNext(result);
        }
    });
like image 58
Ricard Kollcaku Avatar answered Sep 19 '25 19:09

Ricard Kollcaku