Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scatter-gather: combine set of Mono<List<Item>> into single Mono<List<Item>>

Can I combine a list of Mono<List<Item>> data sources into a single Mono<List<Item>> containing all items without blocking?

In my JDK 9 Spring Boot 2 with Lombok scatter-gather application, this blocking version works:

    private Mono<List<Item>> gather(List<Mono<List<Item>>> data) {
        return Mono.just( data.stream().map(m -> m.block())
                .flatMap(List::parallelStream).collect(Collectors.toList()));
    }

Each source data stream invokes block() on its Mono; I'd like to reduce the block() calls if possible...preferably to zero. Any ideas?

Test case

@RunWith(SpringRunner.class)
public class ReactiveTests {
    @Test
    public void testScatterGather() {
        List<List<Item>> dataSet = dataSet();
        Mono<List<Item>> data = gather(scatter(dataSet));
        StepVerifier.create(data)
            .expectNext(toItemList(dataSet))
            .expectComplete();
    }

    private Mono<List<Item>> gather(List<Mono<List<Item>>> data) {
        return Mono.just( data.stream().map(m -> m.block())
                .flatMap(List::parallelStream).collect(Collectors.toList()));
    }

    private List<Mono<List<Item>>> scatter(List<List<Item>> data) {
        return newMonoLists(data);
    }

    private List<Item> toItemList(List<List<Item>> data) {
        return data.stream().flatMap(List::stream).collect(Collectors.toList());
    }

    private List<Mono<List<Item>>> newMonoLists(List<List<Item>> data) {
        return data.stream().map(l -> Mono.just(l)).collect(Collectors.toList());
    }

    private List<List<Item>> dataSet() {
        return Arrays.asList(dataSet(1L),dataSet(4L),dataSet(7L));
    }

    private List<Item> dataSet(long id) {
        return Arrays.asList(new Item(id), new Item(id+1), new Item(id+2));
    }
    @Data @AllArgsConstructor private static class Item { private Long id; }
}
like image 935
Jan Nielsen Avatar asked Oct 26 '25 06:10

Jan Nielsen


1 Answers

Suppose you have 2 Mono sources having a List of String.

Mono<List<String>> listMono1 = Mono.just(Arrays.asList("1","3","5","7","9","11"));
Mono<List<String>> listMono2 = Mono.just(Arrays.asList("2","4","6","8","10","12"));

You can merge two publishers using the method Flux.merge() which takes in a bunch of publishers and merges them together.

But if you merge both the publishers like below, then it will finally give you List<List<String>> , which you don't want.

Flux.merge(listMono1, listMono2).collectList();

So you have to modify the listMono1 and listMono2 such that they emit individual elements of the List and then merge both of them. To do that you can use Mono.flatMapMany() which will help you emit all the elements from the List. So, finally you will get the combined List as follows:

Flux.merge(listMono1.flatMapMany(Flux::fromIterable),listMono2.flatMapMany(Flux::fromIterable)).collectList();

Hope its what you want finally!

like image 152
uneq95 Avatar answered Oct 28 '25 21:10

uneq95



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!