Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert List<Mono<T>> to Mono<List<T>>?

I have a method that returns Mono<Output>:

interface Processor {
  Mono<Output> process(Input input);
}

And I want to execute this processor method for a collection:

List<Input> inputs = // get inputs
Processor processor = // get processor
List<Mono<Output>> outputs = inputs.stream().map(supplier::supply).collect(toList());

But instead of a List<Mono<Output>> I want to get Mono<List<Output>> that will contain aggregated results.

I tried reduce, but the final result looks very clumsy:

Mono<List<Output>> result = inputs.stream().map(processor::process)
    .reduce(Mono.just(new ArrayList<>()),
        (monoListOfOutput, monoOfOutput) ->
            monoListOfOutput.flatMap(list -> monoOfOutput.map(output -> {
              list.add(output);
              return list;
            })),
        (left, right) ->
            left.flatMap(leftList -> right.map(rightList -> {
              leftList.addAll(rightList);
              return leftList;
            })));

Can I achieve this with less code?

like image 708
Ilya Zinkovich Avatar asked Nov 09 '18 18:11

Ilya Zinkovich


People also ask

How do I get a list from Mono list?

Using block() is actually the only way to get the object from a Mono when it is emitted.

How do you convert Flux objects to Mono?

You can use the next() method to convert Flux to Mono. If Flux has more than one value, the newly created Mono will contain the first value. That's it!

How do you get an object from mono without blocking?

A non-blocking way would be via one of the overloaded subscribe() methods. In this example, we will use the subscribe(Consumer<? super T> consumer) to get the data from Mono asynchronously. With subscribe(), the current thread will not be blocked waiting for the Publisher to emit data.

What is the difference between mono and Flux?

Mono is more relatable to the Optional class in Java since it contains 0 or 1 value, and Flux is more relatable to List since it can have N number of values.


2 Answers

If you don't have to create stream for any reason, you could create Flux from your inputs, map it and collect list

Flux.fromIterable(inputs).flatMap(processor::process).collectList();
like image 123
Alexander Pankin Avatar answered Oct 06 '22 01:10

Alexander Pankin


// first merge all the `Mono`s:
List<Mono<Output>> outputs = ...
Flux<Output> merged = Flux.empty();
for (Mono<Output> out : outputs) {
    merged = merged.mergeWith(out);
}

// then collect them
return merged.collectList();

or (inspired by Alexander's answer)

Flux.fromIterable(outputs).flatMap(x -> x).collectList();
like image 33
Alexey Romanov Avatar answered Oct 06 '22 00:10

Alexey Romanov