Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Webflux parallel connections somehow limited to 256

I have a simple setup of server and client:

Flux.range(1, 5000)
        .subscribeOn(Schedulers.parallel())
        .flatMap(i -> WebClient.create()
            .method(HttpMethod.POST)
            .uri("http://localhost:8080/test")
            .body(Mono.just(String.valueOf(i)), String.class)
            .exchange())
        .publishOn(Schedulers.parallel())
        .subscribe(response ->
            response.bodyToMono(String.class)
                .publishOn(Schedulers.elastic())
                .subscribe(body -> log.info("{}", body)));

here is the client:

@PostMapping
public Mono<String> test(@RequestBody Mono<String> body) {
    return body.delayElement(Duration.ofSeconds(5));
}

Both things run on netty. Maybe someone has an idea what is causing this behavior?

like image 370
guapito Avatar asked Oct 14 '25 16:10

guapito


1 Answers

This is not due to a WebClient limitation about connection pools, but this actually comes from a Reactor implementation details that you can change.

By default, Reactor operators such as flatMap have prefetch=32 (the number of elements we request before the end subscriber asks for those) and maxConcurrency=256 (the maximum number of elements processed concurrently by the operator).

You can use variants of Flux.flatMap(Function mapper, int concurrency, int prefetch) to change that behavior.

Your code snippet is using a mix of subscribeOn and publishOn; I'd say that given you're doing reactive I/O work with this code snippet, you shouldn't try to schedule work on an elastic/parallel scheduler. Removing those operators is best here.

like image 190
Brian Clozel Avatar answered Oct 19 '25 04:10

Brian Clozel