From Reactor's reference guide, I learnt that Flux.create() can be used to convert aysnc callback to Flux.
However, sometimes callbacks have multiple methods to receive multiple types of data, assume I have piece of code as below:
asrService.recognize(new Callback() {
    @Override
    public void stateChange(State state) {
        // consume state
    }
    @Override
    public void onResultData(Result result) {
        // consume result
    }
});
How to convert it to two reactive streams: Flux<State> and Flux<Result>?
one way is to use Some processors Like DirectProcessor you can create 2 Different Processors and on event emit item to processor and subscribe the processor but if you still want to use Flux.create you can do it like this
    Flux<Object> objectFlux;
@Override
public void run(String... args) throws Exception {
    objectFlux = Flux.create(objectFluxSink ->
            asrService.recognize(new Callback() {
                @Override
                public void stateChange(State state) {
                    objectFluxSink.next(state);
                }
                @Override
                public void onResultData(Result result) {
                    objectFluxSink.next(state);
                }
            }));
}
public Flux<Result> getResult(){
 return    objectFlux.filter(o -> o instanceof Result)
            .map(o -> ((Result)o));
}
public Flux<State> geState(){
    return    objectFlux.filter(o -> o instanceof State)
            .map(o -> ((State)o));
}
i still think that using processor should be much more cleaner and you dont need to do that filter and casting but you need to have 2 Processors Like this :
        DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();
    DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();
    asrService.recognize(new Callback() {
        @Override
        public void stateChange(State state) {
            stateDirectProcessor.onNext(state);
        }
        @Override
        public void onResultData(Result result) {
            resultDirectProcessor.onNext(result);
        }
    });
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With