Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I sum the values in a Reactor Flux stream?

Suppose I have a repository with a findAll() method that returns an Iterable of State, where State is a class representing a US state that has two fields (with getter/setters): name, and population.

I want to get the sum of the population fields for all States in my Flux. I create a Flux from an Iterable, as follows:

Flux f = Flux.fromIterable(stateRepo.findAll());

I've got my Flux, but I don't know of a good way to sum up its values. I've tried something like

int total = 0;
f.map(s -> s.getPopulation()).subscribe(p -> total += v);
return total;

However, the compiler says that total "should be final or effectively final". Adding final obviously won't work, because I'm trying to add to it.

How do I go about summing (or any other aggregate function) on a Flux?

like image 888
Mark Avatar asked Feb 27 '18 21:02

Mark


People also ask

How do you convert flux to Mono?

Instead of take(1) , you could use next() . This will transform the Flux into a valued Mono by taking the first emitted item, or an empty Mono if the Flux is empty itself.

What is flux in reactor?

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.

What is projectreactor?

Project Reactor is a fully non-blocking foundation with back-pressure support included. It's the foundation of the reactive stack in the Spring ecosystem and is featured in projects such as Spring WebFlux, Spring Data, and Spring Cloud Gateway.

What is Mono just?

The Mono. just method is the simplest method for Mono generation. It takes a single value and generates a finite Mono stream from it. A completion event is published after publishing the specified value: Mono.


2 Answers

Use reduce method:

@GetMapping("/populations")
    public Mono<Integer> getPopulation() {
        return Flux.fromIterable(stateRepo.findAll())
                .map(s -> s.getPopulation())
                .reduce(0, (x1, x2) -> x1 + x2)
                .map(this::someFunction); // here you can handle the sum
    }
like image 136
mroman Avatar answered Sep 17 '22 14:09

mroman


You can import reactor extra package from maven

io.projectreactor.addons:reactor-extra

and then use MathFlux.sumInt(integresFlux)
docs: https://projectreactor.io/docs/core/release/reference/#extra-math

like image 22
Slava Avatar answered Sep 17 '22 14:09

Slava