Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Blocking operations in Reactive application using MongoDB reactive drivers

I am developing a REST API which is reactive, using Reactive Mongo drivers, Spring Webflux framework.

All works fine when the concurrency level is below 100, however when I increase the concurrency level to 200, blockhound reports blocking calls to Mongo.

I am using Mongod community version on windows, with Blockhound installed in Spring boot application and Apache bench to test the concurrent behaviour.

Below is the error. This error is reported only when concurrency above 100.

Any suggestions on how to resolve this error?

Please note I have reported this issue with Mongodb community as well.

They have informed that it may be a false positive as system may not have enough resources to create more connections. If there are no resources to create more connections, should Blockhound report as blocking?

2021-01-02 09:52:17.399 ERROR 18688 --- [reactor-http-nio-2] org.mongodb.driver.operation             
: Callback onResult call produced an error.

reactor.blockhound.BlockingOperationError: Blocking call! jdk.internal.misc.Unsafe#park
    at java.base/jdk.internal.misc.Unsafe.park(Unsafe.java)
    at java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
    at java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
    at java.base/java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:409)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1347)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:714)
    at com.mongodb.internal.connection.DefaultConnectionPool.getAsync(DefaultConnectionPool.java:157)
    at com.mongodb.internal.connection.DefaultServer.getConnectionAsync(DefaultServer.java:105)
    at com.mongodb.internal.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getConnection(AsyncClusterBinding.java:131)
    at com.mongodb.internal.async.client.ClientSessionBinding$SessionBindingAsyncConnectionSource.getConnection(ClientSessionBinding.java:140)
    at com.mongodb.internal.operation.OperationHelper.withAsyncConnectionSource(OperationHelper.java:730)
    at com.mongodb.internal.operation.OperationHelper.access$200(OperationHelper.java:68)
    at com.mongodb.internal.operation.OperationHelper$AsyncCallableWithConnectionAndSourceCallback.onResult(OperationHelper.java:750)
    at com.mongodb.internal.operation.OperationHelper$AsyncCallableWithConnectionAndSourceCallback.onResult(OperationHelper.java:738)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
    at com.mongodb.internal.async.client.ClientSessionBinding$WrappingCallback.onResult(ClientSessionBinding.java:208)
    at com.mongodb.internal.async.client.ClientSessionBinding$WrappingCallback.onResult(ClientSessionBinding.java:196)
    at com.mongodb.internal.binding.AsyncClusterBinding$1.onResult(AsyncClusterBinding.java:105)
    at com.mongodb.internal.binding.AsyncClusterBinding$1.onResult(AsyncClusterBinding.java:99)
    at com.mongodb.internal.connection.BaseCluster$ServerSelectionRequest.onResult(BaseCluster.java:432)
    at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:299)
    at com.mongodb.internal.connection.BaseCluster.selectServerAsync(BaseCluster.java:155)
    at com.mongodb.internal.connection.SingleServerCluster.selectServerAsync(SingleServerCluster.java:42)
    at com.mongodb.internal.binding.AsyncClusterBinding.getAsyncClusterBindingConnectionSource(AsyncClusterBinding.java:99)
    at com.mongodb.internal.binding.AsyncClusterBinding.getReadConnectionSource(AsyncClusterBinding.java:84)
    at com.mongodb.internal.async.client.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:58)
    at com.mongodb.internal.operation.OperationHelper.withAsyncReadConnection(OperationHelper.java:677)
    at com.mongodb.internal.operation.FindOperation.executeAsync(FindOperation.java:689)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1$1.onResult(OperationExecutorImpl.java:86)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1$1.onResult(OperationExecutorImpl.java:74)
    at com.mongodb.internal.async.client.OperationExecutorImpl.getReadWriteBinding(OperationExecutorImpl.java:177)
    at com.mongodb.internal.async.client.OperationExecutorImpl.access$200(OperationExecutorImpl.java:43)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1.onResult(OperationExecutorImpl.java:72)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1.onResult(OperationExecutorImpl.java:66)
    at com.mongodb.internal.async.client.ClientSessionHelper.createClientSession(ClientSessionHelper.java:60)
    at com.mongodb.internal.async.client.ClientSessionHelper.withClientSession(ClientSessionHelper.java:51)
    at com.mongodb.internal.async.client.OperationExecutorImpl.execute(OperationExecutorImpl.java:66)
    at com.mongodb.internal.async.client.AsyncMongoIterableImpl.batchCursor(AsyncMongoIterableImpl.java:167)
    at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.requestInitialData(MongoIterableSubscription.java:45)
    at com.mongodb.reactivestreams.client.internal.AbstractSubscription.tryRequestInitialData(AbstractSubscription.java:177)
    at com.mongodb.reactivestreams.client.internal.AbstractSubscription.request(AbstractSubscription.java:100)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:235)
    at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.<init>(MongoIterableSubscription.java:39)
    at com.mongodb.reactivestreams.client.internal.Publishers.lambda$publish$0(Publishers.java:43)
    at com.mongodb.reactivestreams.client.internal.FindPublisherImpl.subscribe(FindPublisherImpl.java:175)
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:61)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259)
    at reactor.core.publisher.Operators.error(Operators.java:196)
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)
    at reactor.core.publisher.MonoFlatMapMany.subscribeOrReturn(MonoFlatMapMany.java:49)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8133)
    at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:93)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:305)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:99)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:383)
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:396)
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:540)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:252)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
like image 716
bigo1 Avatar asked Dec 07 '25 11:12

bigo1


1 Answers

There is a common exception that mongo may kick out when its own query request queue gets too large. This happens because webflux is efficient at passing through requests and the mongo driver chokes, on the database side. The back pressure isn't handled across the connection.

Not sure without seeing code but that could be one thing to investigate.

You may also be making a simple mistake in code and actually calling blocking code in your web handlers.

like image 197
bbuck Avatar answered Dec 09 '25 01:12

bbuck



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!