Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring 5 Reactive fails when extending Flux/implementing Publisher and calling s.onNext() more than once

I just started playing with the new Spring 5 reactive support and wanted to simulate some asyncronous data generation, having noticed two faulty behaviours:

1) Calling s.onNext( String ) more than once:

@GetMapping("/strings")
public Publisher<String> getStrings(){

    return new Publisher<String>() {

        @Override
        public void subscribe(Subscriber<? super String> s) {
            int i = 0;
            while(++i <= 5){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                s.onNext("message");
            }
            s.onComplete();
        }
    };
}

In this case the stacktrace is:

2016-08-03 13:35:04.986 DEBUG 5136 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /strings
2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<java.lang.String> com.codependent.spring5.playground.reactive.web.AccountsController.getStrings()]
2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsController'
2016-08-03 13:35:07.120 DEBUG 5136 --- [nio-8080-exec-1] o.s.w.s.h.ExceptionHandlingWebHandler    : Could not complete request

java.lang.IllegalStateException: RECEIVED
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) [classes/:na]
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) [spring-web-5.0.0.M1.jar:5.0.0.M1]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) [spring-web-5.0.0.M1.jar:5.0.0.M1]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

2016-08-03 13:35:07.121 DEBUG 5136 --- [nio-8080-exec-1] o.s.h.s.r.ServletServerHttpResponse      : Can't set the status 500 because the HTTP response has already been committed
2016-08-03 13:35:08.127 ERROR 5136 --- [nio-8080-exec-1] a.c.c.C.[.[.0.0.0.[.[httpHandlerServlet] : Servlet.service() for servlet [httpHandlerServlet] in context with path [] threw exception

reactor.core.Exceptions$BubblingException: java.lang.IllegalStateException: RECEIVED
    at reactor.core.Exceptions.bubble(Exceptions.java:97) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onError(MonoThenApply.java:209) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onError(FluxResume.java:105) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.OperatorAdapter.doOnSubscriberError(OperatorAdapter.java:113) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:91) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) ~[classes/:na]
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) ~[tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) ~[tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) ~[tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: java.lang.IllegalStateException: RECEIVED
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
    ... 57 common frames omitted

2) Calling s.onNext( Alert.class -any DTO- ) more than once:

@GetMapping("/alerts")
public Publisher<Alert> getAlerts(){

    return new Publisher<Alert>() {

        @Override
        public void subscribe(Subscriber<? super Alert> s) {
            int i = 0;
            while(++i <= 5){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                s.onNext(new Alert((long)1, "ms"));
            }
            s.onComplete();
        }
    };
}

Now it doesn't show any error on the logs but the caller gets a 500 response code and the content '['.

Log:

2016-08-03 13:37:11.834 DEBUG 5136 --- [nio-8080-exec-3] o.s.web.reactive.DispatcherHandler       : Processing GET request for [http://localhost:8080/alerts]
2016-08-03 13:37:11.835 DEBUG 5136 --- [nio-8080-exec-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /alerts
2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsController.getAlerts()]
2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsController'

Why can't we invoke onNext() multiple times? How could we do that?

NOTE: I if just call onNext once it works ok:

@Override
public void subscribe(Subscriber<? super String> s) {
    s.onNext("my message" + Math.random());
    s.onComplete();
}

or

@Override
public void subscribe(Subscriber<? super Alert> s) {
    s.onNext(new Alert((long)1, "ms"));
    s.onComplete();
}
like image 771
codependent Avatar asked Aug 02 '16 08:08

codependent


2 Answers

My Publisher implementation didn't follow the reactive streams spec, this is how I fixed it:

@GetMapping(value="/strings", produces="text/event-stream")
public Publisher<String> getStrings(){
    return new Publisher<String>() {

        private int loops = 5;

        @Override
        public void subscribe(Subscriber<? super String> s) {

            s.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    for (int i = 0; i < n; i++) {
                        if(loops-- > 0){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            s.onNext("message"+Math.random());                          
                        }else{
                            s.onComplete();
                        }
                    }
                }
                @Override
                public void cancel() {
                    loops = 0;
                }
            });
        }
    };
}

If you want to learn more about this, have a look at the issue I opened in Spring's JIRA and the helpful comment I got there.

like image 132
codependent Avatar answered Nov 18 '22 20:11

codependent


I haven't tested this yet as I'm a bit busy - will do later so if this doesn't work sorry! :)

From the above comments it looks like the problem is with your Flux creation.

I'm assuming that Spring Reactive Controllers are able to handle a Flux which emits multiple without this being over WebSockets or SSE. Again, I'll have a play a bit later.

Flux has a lot of static methods for construction which will help you here.

How about doing it the following way:

return Flux.intervalMillis(1000)
.map(l -> new new SensorRead(sensorId, Math.random()));

But this will give you a never ending stream which might not be what you want.

The other option is something like this:

return Flux.range(1, 5) //Spit out 5 values starting from 1
.delayMillis(1000) //Delay the onNext calls to separate 1 second apart
.map(l -> new new SensorRead(sensorId, Math.random()));

Update

OK, so this question has changed quite significantly.

In answer to "Why can't we invoke onNext() multiple times? How could we do that?"

Of course I didn't write the API so the reasoning I can't answer, but IMO there is an ambiguity and complexity as to how one would want to handle the multiple emissions in the miriad of different ways it can be expressed.

HTTP 1.1 doesn't allow multiple responses per request so the only valid option is some collect into a list or low level write the onNext to the output stream for each emission - both of which have complexities around content type (EG XML vs JSON)

This is further complicated when we bring in HTTP2, WebSockets and SSE which can do some form of multiple responses per request - again each needing to be handled differently.

If you want to be able to do multiple emissions, then you'll need to look at WebSockets or SSE.

The Spring-Reactive project does have SSE classes so looks like it's implemented.

EG

@RequestMapping("/sse/event")
    Flux<SseEvent> sse() {
        return Flux.interval(Duration.ofMillis(100)).map(l -> {
            SseEvent event = new SseEvent();
            event.setId(Long.toString(l));
            event.setData("foo");
            event.setComment("bar");
            return event;
        }).take(2);
    }

Have a look at below for more examples:

https://github.com/spring-projects/spring-reactive/blob/master/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

Hope this helps

like image 21
Will Avatar answered Nov 18 '22 22:11

Will