Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring sseEmitter, events aren't sent right after method send was called

I am trying to make server-sent events with Spring 4 (tomcat 7, servlet-api 3.0.1).

The problem is my Events aren't sent right after method send was called. They all come simultaneously (with same timestamp) to client only after timeout of SseEmitter, with EventSource's error event. And then client is trying to reconnect. Any idea what's happening?

I have created a simple service:

@RequestMapping(value = "subscribe", method = RequestMethod.GET)
public SseEmitter subscribe () throws IOException {
    final SseEmitter emitter = new SseEmitter();
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                emitter.send(SseEmitter.event().data("Thread writing: " + Thread.currentThread()).name("ping"));
            } catch (Exception e) {
            }
        }
    } , 1000, 1000, TimeUnit.MILLISECONDS);
    return emitter;
}

with client code:

sse = new EventSource(urlBuilder(base, url));
sse.addEventListener('ping', function (event) {
    dfd.notify(event);
});

sse.addEventListener('message', function(event){
    dfd.notify(event);
});

sse.addEventListener('close', function(event){
    dfd.notify(event);
});

sse.onerror = function (error) {
    console.log(error);
};

sse.onmessage = function (event){
    dfd.notify(event);
};

App initalizer code

public class WebAppInitializer implements WebApplicationInitializer {
    @Override
    public void onStartup(ServletContext servletContext) throws ServletException {
        AnnotationConfigWebApplicationContext ctx = new AnnotationConfigWebApplicationContext();
        ctx.register(AppConfig.class);
        ctx.setServletContext(servletContext);
        ctx.refresh();

        ServletRegistration.Dynamic dynamic = servletContext.addServlet("dispatcher", new DispatcherServlet(ctx));
        dynamic.setAsyncSupported(true);
        dynamic.addMapping("/api/*");
        dynamic.setLoadOnStartup(1);
        dynamic.setMultipartConfig(ctx.getBean(MultipartConfigElement.class));

        javax.servlet.FilterRegistration.Dynamic filter = servletContext
                .addFilter("StatelessAuthenticationFilter",
                        ctx.getBean("statelessAuthenticationFilter", StatelessAuthenticationFilter.class));
        filter.setAsyncSupported(true);
        filter.addMappingForUrlPatterns(null, false, "/api/*");

        filter = servletContext.addFilter("HibernateSessionRequestFilter",
                ctx.getBean("hibernateSessionRequestFilter", HibernateSessionRequestFilter.class));
        filter.setAsyncSupported(true);
        filter.addMappingForUrlPatterns(null, false, "/api/user/*");
    }
}

AppConfig.java

@Configuration
@ComponentScan("ru.esoft.workflow")
@EnableWebMvc
@PropertySource({"classpath:mail.properties", "classpath:fatclient.properties"})
@EnableAsync
@EnableScheduling
public class AppConfig extends WebMvcConfigurerAdapter {
...
}

Image of my client log: enter image description here

like image 712
Timofey Novitskiy Avatar asked Dec 09 '15 00:12

Timofey Novitskiy


1 Answers

I ran into this myself when testing SSEEmitters. From everything I've read online, SSEEmitters are meant to be used in conjunction with some implementation of Reactive Streams, such as RxJava. It's a bit complex, but it definitely works. The idea is that you create the emitter, and an Observable, and subscribe the latter to a Publisher. The Publisher executes its behavior in a separate thread, notifying the Observable when output is ready, and the observable triggers the emitter.send. Here is an example snippet that should do what you want:

@RequestMapping("/whatever")
public SseEmitter index(    
    SseEmitter emitter = new SseEmitter();
    Publisher<String> responsePublisher = someResponseGenerator.getPublisher();
    Observable<String> responseObservable = RxReactiveStreams.toObservable(responsePublisher);

    responseObservable.subscribe(
        str -> {
            try {
                emitter.send(str);
            } catch (IOException ex) {
                emitter.completeWithError(ex);
            }
        },
        error -> {
            emitter.completeWithError(error);
        },
        emitter::complete
        );

        return emitter;
};

Here is a the corresponding Publisher:

public class SomeResponseGenerator {    
    public Publisher<String> getPublisher() {
        Publisher<String> pub = new Publisher<String>() {
            @Override
            public void subscribe(Subscriber subscriber) {
                Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        subscriber.onNext("Thread writing: " + Thread.currentThread().getName());
                    }
                }, 1000, 1000, TimeUnit.MILLISECONDS);
            }
        };

        return pub;
    }
}

There are a few examples of this model online here and here, and you can find more by Googling 'RxJava SseEmitter'. It takes some time to grok the Reactive Streams/RxJava/SseEmitter interactions, but once you do it is pretty elegant. Hope this sets you on the right path!

like image 55
Kevin Page Avatar answered Sep 26 '22 06:09

Kevin Page