So my goal is to use WebClient to make multiple concurrent requests, wait until they're all completed, then combine the results. Here is what I have so far:
...
Flux<ServerResponse> feedResponses = request
        .bodyToMono(AddFeedRequestDto.class)
        .map(AddFeedRequestDto::getFeeds) // Returns a list of RSS feed URLs
        .map(this::getServerResponsesFromUrls) // Returns a list of Mono<Feed>
        .map(Flux::merge) // Wait til all requests are completed
        // Not sure where to go from here
...
/** Related methods: **/
private List<Mono<Feed>> getServerResponsesFromUrls(List<String> feedUrls) {
    List<Mono<Feed>> feedResponses = new ArrayList<>();
    feedUrls.forEach(feedUrl -> feedResponses.add(getFeedResponse(feedUrl)));
    return feedResponses;
}
public Mono<Feed> getFeedResponse(final String url) {
    return webClient
            .get()
            .uri(url)
            .retrieve()
            .bodyToMono(String.class) // Ideally, we should be able to use bodyToMono(FeedDto.class)
            .map(this::convertResponseToFeedDto)
            .map(feedMapper::convertFeedDtoToFeed);
}
/** Feed.java **/
@Getter
@Setter
public class Feed {
    List<Item> items;
}
Basically my goal is to combine all the items from each of the feeds to create one unified feed. However, I am not quite sure what to do after the call to Flux::merge. Any suggestions would be appreciated.
Use .flatMap instead of .map / Flux.merge, like this:
Mono<Feed> unifiedFeedMono = request
        .bodyToMono(AddFeedRequestDto.class)  // Mono<AddFeedRequestDto>
        .map(AddFeedRequestDto::getFeeds)     // Mono<List<String>> feedUrls
        .flatMapMany(Flux::fromIterable)      // Flux<String> feedUrls
        .flatMap(this::getFeedResponse)       // Flux<Feed>
        .map(Feed::getItems)                  // Flux<List<Item>>
        .flatMap(Flux::fromIterable)          // Flux<Item>
        .collectList()                        // Mono<List<Item>>
        .map(Feed::new);                      // Mono<Feed>
Note that .flatMap is an asynchronous operation and will execute requests in parallel.  There is an overloaded version that takes a concurrency argument if you want to limit concurrency.
Ordering is not guaranteed with .flatMap, and the resulting items might be interleaved.  If you want more ordering guarantees, substitute .concatMap or .flatMapSequential.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With