I have a method that returns Mono<Output>
:
interface Processor {
Mono<Output> process(Input input);
}
And I want to execute this processor
method for a collection:
List<Input> inputs = // get inputs
Processor processor = // get processor
List<Mono<Output>> outputs = inputs.stream().map(supplier::supply).collect(toList());
But instead of a List<Mono<Output>>
I want to get Mono<List<Output>>
that will contain aggregated results.
I tried reduce
, but the final result looks very clumsy:
Mono<List<Output>> result = inputs.stream().map(processor::process)
.reduce(Mono.just(new ArrayList<>()),
(monoListOfOutput, monoOfOutput) ->
monoListOfOutput.flatMap(list -> monoOfOutput.map(output -> {
list.add(output);
return list;
})),
(left, right) ->
left.flatMap(leftList -> right.map(rightList -> {
leftList.addAll(rightList);
return leftList;
})));
Can I achieve this with less code?
Using block() is actually the only way to get the object from a Mono when it is emitted.
You can use the next() method to convert Flux to Mono. If Flux has more than one value, the newly created Mono will contain the first value. That's it!
A non-blocking way would be via one of the overloaded subscribe() methods. In this example, we will use the subscribe(Consumer<? super T> consumer) to get the data from Mono asynchronously. With subscribe(), the current thread will not be blocked waiting for the Publisher to emit data.
Mono is more relatable to the Optional class in Java since it contains 0 or 1 value, and Flux is more relatable to List since it can have N number of values.
If you don't have to create stream for any reason, you could create Flux from your inputs, map it and collect list
Flux.fromIterable(inputs).flatMap(processor::process).collectList();
// first merge all the `Mono`s:
List<Mono<Output>> outputs = ...
Flux<Output> merged = Flux.empty();
for (Mono<Output> out : outputs) {
merged = merged.mergeWith(out);
}
// then collect them
return merged.collectList();
or (inspired by Alexander's answer)
Flux.fromIterable(outputs).flatMap(x -> x).collectList();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With