Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

block()/blockFirst()/blockLast() are blocking error when calling bodyToMono AFTER exchange()

I am trying to use Webflux to stream a generated file to another location, however, if the generation of the file ran into an error, the api returns success, but with a DTO detailing the errors while generating the file instead of the file itself. This is using a very old and poorly designed api so please excuse the use of post and the api design.

The response from the api call (exchange()) is a ClientResponse. From here I can either convert to a ByteArrayResource using bodyToMono which can be streamed to a file, or, if there is an error in creating the file, then I can convert to the DTO also using bodyToMono. However, I cannot seem to do either or depending on the contents of the header of ClientResponse.

In run time I get an IllegalStateException caused by

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-client-epoll-12

I think my issue is that I cannot call block() twice in the same function chain.

My code snippet is like so:

webClient.post()         .uri(uriBuilder -> uriBuilder.path("/file/")                                       .queryParams(params).build())         .exchange()         .doOnSuccess(cr -> {                 if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {                     NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();                     createErrorFile(dto);                 }                 else {                     ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();                     createSpreadsheet(bAr);                 }             }         )         .block(); 

Basically I want to process the ClientResponse differently based on the MediaType which is defined in the header.

Is this possible?

like image 483
DaithiG Avatar asked Jul 20 '18 20:07

DaithiG


People also ask

What is blockLast ()?

Using .blockLast() This method is used to subscribe to the Flux and wait until the last value from upstream received.

How do you get an object from mono without blocking?

You can retrieve the result from Mono in a non-blocking way by subscribing a Consumer that will consume all the sequences. The Consumer code block executes asynchronously only after Mono completes. For example, the following subscribe method prints the result of Mono to the console.

Is Mono subscribe blocking?

As you know, Mono is an asynchronous call that executes in a non-blocking way.

What is mono subscribe?

Mono subscribe() The subscribe() method with no arguments subscribes to the Mono and requests for the data from the publisher. It does not consume the data and also has no error handling mechanism.


1 Answers

First, a few things that will help you understand the code snippet solving this use case.

  1. You should never call a blocking method within a method that returns a reactive type; you will block one of the few threads of your application and it is very bad for the application
  2. Anyway as of Reactor 3.2, blocking within a reactive pipeline throws an error
  3. Calling subscribe, as suggested in the comments, is not a good idea either. It is more or less like starting that job as a task in a separate thread. You'll get a callback when it's done (the subscribe methods can be given lambdas), but you're in fact decoupling your current pipeline with that task. In this case, the client HTTP response could be closed and resources cleaned before you get a chance to read the full response body to write it to a file
  4. If you don't want to buffer the whole response in memory, Spring provides DataBuffer (think ByteBuffer instances that can be pooled).
  5. You can call block if the method you're implementing is itself blocking (returning void for example), for example in a test case.

Here's a code snippet that you could use to do this:

Mono<Void> fileWritten = WebClient.create().post()         .uri(uriBuilder -> uriBuilder.path("/file/").build())         .exchange()         .flatMap(response -> {             if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {                 Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);                 return createErrorFile(dto);             }             else {                 Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);                 return createSpreadsheet(body);             }         }); // Once you get that Mono, you should give plug it into an existing // reactive pipeline, or call block on it, depending on the situation 

As you can see, we're not blocking anywhere and methods dealing with I/O are returning Mono<Void>, which is the reactive equivalent of a done(error) callback that signals when things are done and if an error happened.

Since I'm not sure what the createErrorFile method should do, I've provided a sample for createSpreadsheet that just writes the body bytes to a file. Note that since databuffers might be recycled/pooled, we need to release them once we're done.

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {     try {         Path file = //...         WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);         return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();     } catch (IOException exc) {         return Mono.error(exc);     } } 

With this implementation, your application will hold a few DataBuffer instances in memory at a given time (the reactive operators are prefetching values for performance reasons) and will write bytes as they come in a reactive fashion.

like image 82
Brian Clozel Avatar answered Sep 23 '22 06:09

Brian Clozel