Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Boot Webflux/Netty - Detect closed connection

I've been working with spring-boot 2.0.0.RC1 using the webflux starter (spring-boot-starter-webflux). I created a simple controller that returns a infinite flux. I would like that the Publisher only does its work if there is a client (Subscriber). Let's say I have a controller like this one:

@RestController
public class Demo {

    @GetMapping(value = "/")
    public Flux<String> getEvents(){
        return Flux.create((FluxSink<String> sink) -> {

            while(!sink.isCancelled()){

                // TODO e.g. fetch data from somewhere

                sink.next("DATA");
            }
            sink.complete();
        }).doFinally(signal -> System.out.println("END"));
    }

}

Now, when I try to run that code and access the endpoint http://localhost:8080/ with Chrome, then I can see the data. However, once I close the browser the while-loop continues since no cancel event has been fired. How can I terminate/cancel the streaming as soon as I close the browser?

From this answer I quote that:

Currently with HTTP, the exact backpressure information is not transmitted over the network, since the HTTP protocol doesn't support this. This can change if we use a different wire protocol.

I assume that, since backpressure is not supported by the HTTP protocol, it means that no cancel request will be made either.

Investigating a little bit further, by analyzing the network traffic, showed that the browser sends a TCP FIN as soon as I close the browser. Is there a way to configure Netty (or something else) so that a half-closed connection will trigger a cancel event on the publisher, making the while-loop stop?

Or do I have to write my own adapter similar to org.springframework.http.server.reactive.ServletHttpHandlerAdapter where I implement my own Subscriber?

Thanks for any help.

EDIT: An IOException will be raised on the attempt to write data to the socket if there is no client. As you can see in the stack trace.

But that's not good enough, since it might take a while before the next chunk of data will be ready to send and therefore it takes the same amount of time to detect the gone client. As pointed out in Brian Clozel's answer it is a known issue in Reactor Netty. I tried to use Tomcat instead by adding the dependency to the POM.xml. Like this:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

Although it replaces Netty and uses Tomcat instead, it does not seem reactive due to the fact that the browser does not show any data. However, there is no warning/info/exception in the console. Is spring-boot-starter-webflux as of this version (2.0.0.RC1) supposed to work together with Tomcat?

like image 828
Michael Stadler Avatar asked Feb 15 '18 11:02

Michael Stadler


2 Answers

Since this is a known issue (see Brian Clozel's answer), I ended up using one Flux to fetch my real data and having another one in order to implement some sort of ping/heartbeat mechanism. As a result, I merge both together with Flux.merge().

Here you can see a simplified version of my solution:

@RestController
public class Demo {

    public interface Notification{}

    public static class MyData implements Notification{
        …
        public boolean isEmpty(){…}
    }

    @GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
        return Flux.merge(getEventMessageStream(), getHeartbeatStream());
    }

    private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
        return Flux.interval(Duration.ofSeconds(2))
                .map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
                .doFinally(signalType ->System.out.println("END"));
    }

    private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
        return Flux.interval(Duration.ofSeconds(30))
                .map(i -> {

                    // TODO e.g. fetch data from somewhere,
                    // if there is no data return an empty object

                    return data;
                })
                .filter(data -> !data.isEmpty())
                .map(data -> ServerSentEvent
                        .builder(data)
                        .event("message").build());
    }
}

I wrap everything up as ServerSentEvent<? extends Notification>. Notification is just a marker interface. I use the event field from the ServerSentEvent class in order to separate between data and ping events. Since the heartbeat Flux sends events constantly and in short intervals, the time it takes to detect that the client is gone is at most the length of that interval. Remember, I need that because it might take a while before I get some real data that can be sent and, as a result, it might also take a while before it detects that the client is gone. Like this, it will detect that the client is gone as soon as it can’t sent the ping (or possibly the message event).

One last note on the marker interface, which I called Notification. This is not really necessary, but it gives some type safety. Without that, we could write Flux<ServerSentEvent<?>> instead of Flux<ServerSentEvent<? extends Notification>> as return type for the getNotificationStream() method. Or also possible, make getHeartbeatStream() return Flux<ServerSentEvent<MyData>>. However, like this it would allow that any object could be sent, which I don’t want. As a consequence, I added the interface.

like image 115
Michael Stadler Avatar answered Oct 10 '22 16:10

Michael Stadler


I'm not sure why this behaves like this, but I suspect it is because of the choice of generation operator. I think using the following would work:

    return Flux.interval(Duration.ofMillis(500))
    .map(input -> {
        return "DATA";
    });

According to Reactor's reference documentation, you're probably hitting the key difference between generate and push (I believe a quite similar approach using generate would probably work as well).

My comment was referring to the backpressure information (how many elements a Subscriber is willing to accept), but the success/error information is communicated over the network.

Depending on your choice of web server (Reactor Netty, Tomcat, Jetty, etc), closing the client connection might result in:

  • a cancel signal being received on the server side (I think this is supported by Netty)
  • an error signal being received by the server when it's trying to write on a connection that's been closed (I believe the Servlet spec does not provide that that callback and we're missing the cancel information).

In short: you don't need to do anything special, it should be supported already, but your Flux implementation might be the actual problem here.

Update: this is a known issue in Reactor Netty

like image 30
Brian Clozel Avatar answered Oct 10 '22 15:10

Brian Clozel