Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to execute blocking calls within a Spring Webflux / Reactor Netty web application

In my use case where I have a Spring Webflux microservice with Reactor Netty, I have the following dependencies:

  • org.springframework.boot.spring-boot-starter-webflux (2.0.1.RELEASE)
  • org.springframework.boot.spring-boot-starter-data-mongodb-reactive (2.0.1.RELEASE)
  • org.projectreactor.reactor-spring (1.0.1.RELEASE)

For a very specific case I need to retrieve some information from my Mongo database, and process this into query parameters send with my reactive WebClient. As the WebClient nor the UriComponentsBuilder accepts a Publisher (Mono / Flux) I used a #block() call to receive the results.

Since reactor-core (version 0.7.6.RELEASE) which has been included in the latest spring-boot-dependencies (version 2.0.1.RELEASE) it is not possible anymore to use: block()/blockFirst()/blockLast() are blocking, which is not supported in thread xxx, see -> https://github.com/reactor/reactor-netty/issues/312

My code snippet:

public Mono<FooBar> getFooBar(Foo foo) {
    MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
    parameters.add("size", foo.getSize());
    parameters.addAll("bars", barReactiveCrudRepository.findAllByIdentifierIn(foo.getBarIdentifiers()) // This obviously returns a Flux
        .map(Bar::toString)
        .collectList()
        .block());

    String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
        .port(8081)
        .path("/foo-bar")
        .queryParams(parameters)
        .build()
        .toString();

    return webClient.get()
        .uri(url)
        .retrieve()
        .bodyToMono(FooBar.class);
}

This worked with spring-boot version 2.0.0.RELEASE, but since the upgrade to version 2.0.1.RELEASE and hence the upgrade from reactor-core to version 0.7.6.RELEASE it is not allowed anymore.

The only real solution I see is to include a block (non-reactive) repository / mongo client as well, but I'm not sure if that is encouraged. Any suggestions?

like image 606
Dennis Nijssen Avatar asked Apr 10 '18 06:04

Dennis Nijssen


People also ask

Does spring WebFlux use Netty?

For reactive stack applications, the spring-boot-starter-webflux includes Reactor Netty by including spring-boot-starter-reactor-netty , but you can use spring-boot-starter-tomcat , spring-boot-starter-jetty , or spring-boot-starter-undertow instead.

What is the use of block in WebClient?

By using block() we are implicitly reintroducing thread-per-connection approach that will eliminate most of the benefits of the NIO. At the same time using WebClient with block() could be considered as a first step for migration to the fully reactive application.

How does spring WebFlux work internally?

Spring WebFlux is a parallel version of Spring MVC and supports fully non-blocking reactive streams. It supports the back pressure concept and uses Netty as the inbuilt server to run reactive applications. If you are familiar with the Spring MVC programming style, you can easily work on webflux also.

Can WebFlux run on Tomcat?

Apache Tomcat. Spring WebFlux is also supported on a traditional Servlet Container like Apache Tomcat. WebFlux relies on the Servlet 3.1 API with non-blocking I/O. While it uses Servlet API behind a low-level adapter, Servlet API is not available for direct usage.


1 Answers

The WebClient does not accept a Publisher type for its request URL, but nothing prevents you from doing the following:

public Mono<FooBar> getFooBar(Foo foo) {

    Mono<List<String>> bars = barReactiveCrudRepository
        .findAllByIdentifierIn(foo.getBarIdentifiers())
        .map(Bar::toString)
        .collectList();

    Mono<FooBar> foobar = bars.flatMap(b -> {

        MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
        parameters.add("size", foo.getSize());
        parameters.addAll("bars", b);

        String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
            .port(8081)
            .path("/foo-bar")
            .queryParams(parameters)
            .build()
            .toString();

        return webClient.get()
            .uri(url)
            .retrieve()
            .bodyToMono(FooBar.class);
    });
    return foobar;         
}

If anything, this new reactor-core inspection saved you from crashing your whole application with this blocking call in the middle of a WebFlux handler.

like image 76
Brian Clozel Avatar answered Sep 22 '22 05:09

Brian Clozel