I have a simple setup of server and client:
Flux.range(1, 5000)
.subscribeOn(Schedulers.parallel())
.flatMap(i -> WebClient.create()
.method(HttpMethod.POST)
.uri("http://localhost:8080/test")
.body(Mono.just(String.valueOf(i)), String.class)
.exchange())
.publishOn(Schedulers.parallel())
.subscribe(response ->
response.bodyToMono(String.class)
.publishOn(Schedulers.elastic())
.subscribe(body -> log.info("{}", body)));
here is the client:
@PostMapping
public Mono<String> test(@RequestBody Mono<String> body) {
return body.delayElement(Duration.ofSeconds(5));
}
Both things run on netty. Maybe someone has an idea what is causing this behavior?
This is not due to a WebClient limitation about connection pools, but this actually comes from a Reactor implementation details that you can change.
By default, Reactor operators such as flatMap have prefetch=32 (the number of elements we request before the end subscriber asks for those) and maxConcurrency=256 (the maximum number of elements processed concurrently by the operator).
You can use variants of Flux.flatMap(Function mapper, int concurrency, int prefetch) to change that behavior.
Your code snippet is using a mix of subscribeOn and publishOn; I'd say that given you're doing reactive I/O work with this code snippet, you shouldn't try to schedule work on an elastic/parallel scheduler. Removing those operators is best here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With