Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Facing issue "WebClientRequestException: Pending acquire queue has reached its maximum size of 1000" with spring reactive webClient

I am running load of a microservice API, which involves calling other microservice API using Spring Reactive Webclient. I am using Postman runner tab to test this.

Firstly, i run the load with 1500 iteration, second microservice is getting called for each request and everything is working fine as expected. But when i run the load with 5000 iteration, second microservice is getting called for for 3500 times and 1500 calls are failing due to issue

WebClientRequestException: Pending acquire queue has reached its maximum size of 1000

Using org.springframework.web.reactive.function.client.WebClient with default configuration, below is the code snippet.

 private WebClient webClient;

    @PostConstruct
    public void init() {
        this.webClient = WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE,  MediaType.APPLICATION_JSON_VALUE)
                .build();
    }

what can be done to avoid this?

I am using latest spring-boot-starter-parent dependency (with version 2.5.3) with spring-webflux-5.3.9.jar jar.

the logs:

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
        at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
        at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)
        at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:375)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:296)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:885)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

**Caused by: org.springframework.web.reactive.function.client.WebClientRequestException: Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000**
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
        |_ checkpoint ⇢ Request to POST http://172.20.0.2:3130/v1/login/mobile [DefaultWebClient]
Stack trace:
                at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
                at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
                at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
                at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
                at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
                at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:414)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
                at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
                at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
                at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:304)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:172)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:444)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)
                at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)
                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
                at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
                at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
                at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:216)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:269)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282)
                at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:861)
                at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
                at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
                at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
                at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
                at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                at java.base/java.lang.Thread.run(Thread.java:829)
**Caused by: reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)**
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)
        at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
        at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)
like image 557
rajat behl Avatar asked Dec 31 '22 13:12

rajat behl


1 Answers

WebClient needs an HTTP client library to perform requests with and by default it uses Reactor Netty.

Quote from the Reactor-netty reference docs

By default, Reactor Netty client uses a “fixed” connection pool with 500 as the maximum number of active channels and 1000 as the maximum number of further channel acquisition attempts allowed to be kept in a pending state (for the rest of the configurations check the system properties or the builder configurations below). This means that the implementation creates a new channel if someone tries to acquire a channel as long as less than 500 have been created and are managed by the pool. When the maximum number of channels in the pool is reached, up to 1000 new attempts to acquire a channel are delayed (pending) until a channel is returned to the pool again, and further attempts are declined with an error.

What you are seeing is that you are actively using all 500 of the connections in the connection pool and you have filled up the "pending" queue with 1000 pending requests.

You have 2 options to solve this

Scale vertically

Increase the connection pool size and or the acquire queue length

ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
        .maxConnections(<your_desired_max_connections>)
        .pendingAcquireMaxCount(<your_desired_pending_queue_size>)
        .build();
ReactorClientHttpConnector clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create(connectionProvider));
WebClient.builder()
        .clientConnector(clientHttpConnector)
        .build();

Scale horizontally

Create additional instances of your app and load balance the api calls between your instances.

Spring reference docs

Additional note:

It's worth considering the latency of your downstream api call when calculating the size of your connection pool. A good place to start is

connection_pool_size = tps * downstream_api_latency

tps (transactions per second)

like image 199
Michael McFadyen Avatar answered Mar 09 '23 01:03

Michael McFadyen