Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring WebFlux throws 'producer' type is unknow when I return value in the response body

I'm using Spring Boot with Kotlin, and now trying to get status value from a GET restful service by passing a handler for a reactive service.

I can see that the handler I'm passing is in the request, but whenever I'm building the body, I get this exception:

java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.0.RELEASE.jar:5.2.0.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException

Here is my code:

@Bean
    fun getReceiptConversionStatus() = router {
        accept(MediaType.APPLICATION_JSON).nest {
            GET("/BsGetStatus/{handler}", ::handleGetStatusRequest)
        }
    }
    private fun handleGetStatusRequest(serverRequest: ServerRequest): Mono<ServerResponse> = ServerResponse
            .ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(GetStatusViewmodel(fromObject(serverRequest.pathVariable("handler"))), GetStatusViewmodel::class.java)
            .switchIfEmpty(ServerResponse.notFound().build())

and that's my Viewmodel:

data class GetStatusViewmodel(
        @JsonProperty("handler") val documentHandler: String
)
like image 797
hasan.alkhatib Avatar asked Oct 23 '19 08:10

hasan.alkhatib


People also ask

Is WebFlux asynchronous?

Webflux InternalsReactor Netty is an asynchronous, event-driven network application framework built out of Netty server which provides non-blocking and backpressure-ready network engines for HTTP, TCP, and UDP clients and servers.

How do you throw a spring error in WebFlux?

There are three ways that we can use onErrorResume to handle errors: Compute a dynamic fallback value. Execute an alternative path with a fallback method. Catch, wrap and re-throw an error, e.g., as a custom business exception.

How does spring WebFlux work internally?

What is Spring WebFlux ? Spring WebFlux is parallel version of Spring MVC and supports fully non-blocking reactive streams. It support the back pressure concept and uses Netty as inbuilt server to run reactive applications. If you are familiar with Spring MVC programming style, you can easily work on webflux also.

What is backpressure in spring WebFlux?

3. Handling Backpressure in Spring WebFlux. Spring WebFlux provides an asynchronous non-blocking flow of reactive streams. The responsible for backpressure within Spring WebFlux is the Project Reactor. It internally uses Flux functionalities to apply the mechanisms to control the events produced by the emitter.


3 Answers

Flux and Monos are Producers. They produce stuff. You are not passing in a producer in the body thats why you get the error, it doesn't recognize the producer you are passing, because you are passing in a GetStatusViewmodel.

Your body needs to be of type Mono<GetStatusViewmodel>. You can either replace body with bodyValue (it will automatically wrap it for you) or you can wrap your GetStatusViewodel in a Mono using Mono#just before passing it into the body function.

like image 110
Toerktumlare Avatar answered Oct 09 '22 06:10

Toerktumlare


For me, I was doing something like this:

webClient.post()
    .uri("/some/endpoint")
    .body(postRequestObj, PostRequest.class) // erroneous line 
    .accept(MediaType.APPLICATION_JSON)
    .retrieve()
    .bodyToMono(PostResponse.class)
    .timeout(Duration.ofMillis(5000))

When looking at the springs docs for that function body(), this is what's explained:

Variant of body(Publisher, Class) that allows using any producer that can be resolved to Publisher via ReactiveAdapterRegistry.

Parameters:
     producer - the producer to write to the request
     elementClass - the type of elements produced

Returns:
    this builder

So the first parameter can't just be any object, it has to be a producer. Changing my above code to wrap my object around in a Mono fixed this issue for me.

webClient.post()
    .uri("/some/endpoint")
    .body(Mono.just(postRequestObj), PostRequest.class)
    .accept(MediaType.APPLICATION_JSON)
    .retrieve()
    .bodyToMono(PostResponse.class)
    .timeout(Duration.ofMillis(5000))

reference: https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/function/client/WebClient.RequestBodySpec.html

like image 20
SomeGuy Avatar answered Oct 09 '22 07:10

SomeGuy


I actually solved it, and I will post it here just in case somebody would do the same mistake I did :( It was a typical mistake for those who work with Java, it was a wrong import.

I was using fromObject() method in my application "I updated the question to match my actual code". You can find this function in both of these imports, and I was using one of the overloaded body() functions to pass this wrong placed function:

//this is the wrong import I was using
import org.springframework.web.reactive.function.server.EntityResponse.fromObject
//this is the correct one for building the mono body
import org.springframework.web.reactive.function.BodyInserters.fromObject

By using the method from BodyInserters, you will be able to pass fromObject(T) to the body method and it will return the mono result.

like image 39
hasan.alkhatib Avatar answered Oct 09 '22 06:10

hasan.alkhatib