Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting exception while doing block() on Mono object I got back from ReactiveMongoRepository object

I have a service that streams data to a second service that receives stream of objects and saves them to my MongoDB. inside my subscribe function on the Flux object that I get from the streaming service I use the save method from the ReactiveMongoRepository interface. when I try to use the block function and get the data I get the following error :

2019-10-11 13:30:38.559  INFO 19584 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:25}] to localhost:27017
2019-10-11 13:30:38.566  INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 1]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300}
2019-10-11 13:30:39.158  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : cancel()
2019-10-11 13:30:39.429  INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:26}] to localhost:27017
2019-10-11 13:30:39.437  WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil         : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at

my code:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                    Mono<Quote> savedQuote = quoteRepository.save(quote);
                    System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
                });

after some digging, I manage to get it to work but I don't understand why it works now. the new code:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                       Mono<Quote> savedQuote = quoteRepository.insert(quote);
                       savedQuote.subscribe(result ->
                                 System.out.println("I saved a quote! Id :: " + result.getId()));
    });

the definition of block(): Subscribe to this Mono and block indefinitely until a next signal is received.

the definition of subscribe(): Subscribe to this Mono and request unbounded demand.

can someone help me understand why the block didn't work and the subscribe worked? what am I missing here?

like image 269
israel berko Avatar asked Oct 11 '19 10:10

israel berko


People also ask

What does Mono empty return?

Mono. empty() is a method invocation that returns a Mono that completes emitting no item. It represents an empty publisher that only calls onSubscribe and onComplete .

What is blockFirst ()?

blockFirst(Duration timeout) Subscribe to this Flux and block until the upstream signals its first value, completes or a timeout expires. T. blockLast() Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes.

What does block do in WebClient?

block() would sleep the original thread while assigning another thread to perform the http-request. Compared to the alternative RestTemplate, it seems like WebClient would spend additional resources by using the event loop.


1 Answers

Blocking is bad, since it ties up a thread waiting for a response. It's very bad in a reactive framework which has few threads at its disposal, and is designed so that none of them should be unnecessarily blocked.

This is the very thing that reactive frameworks are designed to avoid, so in this case it simply stops you doing it:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4

Your new code, in contrast, works asynchronously. The thread isn't blocked, as nothing actually happens until the repository returns a value (and then the lambda that you passed to savedQuote.subscribe() is executed, printing out you result to the console.)

However, the new code still isn't optimal / normal from a reactive streams perspective, as you're doing all your logic in your subscribe method. The normal thing to do is to us a series of flatMap/map calls to transform the items in the stream, and use doOnNext() for side effects (such as printing out a value):

stockQuoteClient.getQuoteStream()
            .log("quote-monitor-service")
            .flatMap(quoteRepository::insert)
            .doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
            .subscribe();

If you're doing any serious amount of work with reactor / reactive streams, it would be worth reading up on them in general. They're very powerful for non-blocking work, but they do require a different way of thinking (and coding) than more "standard" Java.

like image 126
Michael Berry Avatar answered Sep 30 '22 13:09

Michael Berry