Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Group by object property in java flux

Given the following data structure Data and Flux<Data> what is idiomatic way to achieve grouping into series of lists based on some property:

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;

class Scratch {
    private static class Data {
        private Integer key;
        private String value;

        public Data(Integer key, String value) {
            this.key = key;
            this.value = value;
        }

        public Integer getKey() {
            return key;
        }

        public String getValue() {
            return value;
        }

        public static Data of(Integer key, String value) {
            return new Data(key, value);
        }

        @Override
        public String toString() {
            return value;
        }
    }

    public static void main(String[] args) {
        Flux<Data> test = Flux.just(
                Data.of(1, "Hello"),
                Data.of(1, "world"),
                Data.of(2, "How"),
                Data.of(2, "are"),
                Data.of(2, "you"),
                Data.of(3, "Bye"));
        test.bufferUntil(new Predicate<Data>() {
            Integer prev = null;
            @Override
            public boolean test(Data next) {
                boolean collect = prev != null && !Objects.equals(prev, next.getKey());
                prev = next.getKey();
                return collect;
            }
        }, true).subscribe(e -> System.out.println(e.toString()));
    }
} 

Output:

[Hello, world]
[How, are, you]
[Bye]

I am aware of groupBy function on Flux, but this gives me again a Flux, not a list. Current solution I have described above works, but it does not feel 100% idiomatic because I had to use anonymous class instead of lambda. I could have use lambda and AtomicReference outside from lambda, but that too does not feel 100% right. Any suggestions?

like image 303
Sasa Avatar asked Sep 20 '18 20:09

Sasa


People also ask

What is grouped flux?

A GroupedFlux , like a Flux , must be subscribed to to become active. So what you need to do is somehow consume the inner Flux that groupBy produces. One typical way of doing that is by using flatMap , which already takes a Function<T, Flux> transformation. The function can be as simple as Function.

What is flux object in Java?

A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements. This difference in the semantics of these two streams is very useful, as for example making a request to a http server expects to receive 0 or 1 response, it would be inappropriate to use a Flux in this case.

What is group of object in Java?

The groupingBy() method of Collectors class in Java are used for grouping objects by some property and storing results in a Map instance. In order to use it, we always need to specify a property by which the grouping would be performed.

What is flux zip?

zip() Flux has multiple options to combine publishers. One of them is the zip operator. Zip waits for each source to emit one element and combines these elements. The output emitted is a tuple with as many publishers wrapped inside the zip.


1 Answers

Here is a solution using groupBy operator. I have grouped the data by the common key. The groupBy operator gives me a Flux of GroupedFlux. GroupedFlux is a subclass of Flux, so I apply flatMap and convert an individual groupedFlux to a List<Data> using the collectList operator. Like this, I get a Flux<List<Data>>, which I then subscribe to and print, as asked by you.

test.groupBy(Data::getKey)
                .flatMap(Flux::collectList)
                .subscribe(listOfStringsHavingDataWithSameKey -> System.out.println(listOfStringsHavingDataWithSameKey.toString()));

Do checkout the documentations for Flux and GroupedFlux.

like image 162
uneq95 Avatar answered Oct 04 '22 11:10

uneq95