Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring 2.0 WebFlux: Merge multiple Mono<String> , where string is a json converted to string, into a single Flux<String>

I have three Mono of json strings as below

Mono<String> strInventoryResp=invWebClient.get().
            uri("/findATSInventory?skuId="+skuId).
            exchange().flatMap(resp-> resp.bodyToMono(String.class));


    Mono<String> strProductResponse=productClient.get().
            uri("/v2/products/id/"+skuId).
            exchange().flatMap(resp-> resp.bodyToMono(String.class));


    Mono<String> strItemResp=productClient.get().
            uri("/v2/items?id="+skuId).
            exchange().flatMap(resp-> resp.bodyToMono(String.class));

I want to merge it into a Flux of Json string such that the result is also a json string.

I have tried the Flux.merge static method but, obviously it does not return in json format as shown below

Flux.merge(strProductResponse,strItemResp,strInventoryResp);

How do I return a Flux of combined mono responses such that a valid stream of JSON string is returned in the browser when I invoke the controller calling this method?

EDIT: My problem statement is to invoke those three APIs asynchronously using web flux and combine the result into one. The controller will call this method and return the combined results for a UI. Is there an alternate approach to this?

like image 342
chirag Avatar asked Mar 05 '23 21:03

chirag


2 Answers

This is how i would solve it.

@Test
public void buildResponse() {
    final Mono<String> customerName = Mono.just("customer name");
    final Mono<String> customerPreference = Mono.just("customer preference");
    final Mono<String> cusomterShippingInformation = Mono.just("cusomter shipping information");

    final Mono<JsonObjectYouWantToReturn> returnThisAsAResponse = customerName
            .map(Builder::new)
            .zipWith(customerPreference)
            .map(t -> t.getT1().withCustomerPreference(t.getT2()))
            .zipWith(cusomterShippingInformation)
            .map(t -> t.getT1().withCustomerShippingInformation(t.getT2()))
            .map(Builder::build);

}

private class Builder {
    private String customerName;
    private String customerPreference;
    private String customerShippingInfo;

    public Builder(String customerName) {
        this.customerName = customerName;
    }

    public Builder withCustomerPreference(String customerPreference) {
        this.customerPreference = customerPreference;
        return this;
    }

    public Builder withCustomerShippingInformation(String t3) {
        this.customerShippingInfo = t3;
        return this;
    }


    public JsonObjectYouWantToReturn build() {
        return new JsonObjectYouWantToReturn(customerName, customerPreference, customerShippingInfo);
    }
}

private class JsonObjectYouWantToReturn {

    public final String customerName;
    public final String customerPreference;
    public final String customerShippingInfo;


    public JsonObjectYouWantToReturn(String customerName, String customerPreference, String customerShippingInfo) {
        this.customerName = customerName;
        this.customerPreference = customerPreference;
        this.customerShippingInfo = customerShippingInfo;
    }
}
like image 97
piotr szybicki Avatar answered Mar 09 '23 01:03

piotr szybicki


Another solution like Piotr's but with reduce, lombok and immutable aggregator.

@Test
public void test() {
    Mono<String> strInventoryResp = Mono.just("strInventoryResp");
    Mono<String> strProductResponse = Mono.just("strProductResponse");
    Mono<String> strItemResp = Mono.just("strItemResp");

    Mono<Aggregator> result = Flux.merge(
            strInventoryResp.map(s -> Aggregator.builder().strInventoryResp(s).build()),
            strItemResp.map(s -> Aggregator.builder().strItemResp(s).build()),
            strProductResponse.map(s -> Aggregator.builder().strProductResponse(s).build()))
            .reduce((aggregator, aggregator2) -> aggregator.toBuilder()
                    .strInventoryResp(Optional.ofNullable(aggregator2.strInventoryResp)
                            .orElseGet(() -> aggregator.strInventoryResp))
                    .strProductResponse(Optional.ofNullable(aggregator2.strProductResponse)
                            .orElseGet(() -> aggregator.strProductResponse))
                    .strItemResp(Optional.ofNullable(aggregator2.strItemResp)
                            .orElseGet(() -> aggregator.strItemResp))
                    .build());

    //now you have Mono with filled Aggregator and you can make your json result
}

@Builder(toBuilder = true)
private static class Aggregator {
    private final String strInventoryResp;
    private final String strProductResponse;
    private final String strItemResp;
}

And I have a notice about your example. It looks like you have multiple web clients. And it is a bad practice. You should prefer one web client for application, because of threads usage (multiple web clients will create many threads)

like image 24
Alexander Pankin Avatar answered Mar 08 '23 23:03

Alexander Pankin