I'm trying to set up a Spring SseEmitter to send a sequence of updates of the status of a running job. It seems to be working but:
Whenever I call emitter.complete()
in in my Java server code, the javascript EventSource
client calls the registered onerror
function and then calls my Java endpoint again with a new connection. This happens in both Firefox and Chrome.
I can probably send an explicit "end-of-data" message from Java and then detect that and call eventSource.close()
on the client, but is there a better way?
What is the purpose of emitter.complete()
in that case?
Also, if I always have to terminate the connection on the client end, then I guess every connection on the server side will be terminated by either a timeout or a write error, in which case I probably want to manually send back a heartbeat of some kind every few seconds?
It feels like I'm missing something if I'm having to do all this.
3. SseEmitter. SseEmitter is actually a subclass of ResponseBodyEmitter and provides additional Server-Sent Event (SSE) support out-of-the-box.
SSE is designed to use the JavaScript EventSource API in order to subscribe to a stream of data in any popular browser. Through this interface a client requests a particular URL in order to receive an event stream. SSE is commonly used to send message updates or continuous data streams to a browser client.
What are Server-Sent Events? SSE definition states that it is an http standard that allows a web application to handle a unidirectional event stream and receive updates whenever the server emits data. In simple terms, it is a mechanism for unidirectional event streaming.
The event stream is a simple stream of text data which must be encoded using UTF-8. Messages in the event stream are separated by a pair of newline characters. A colon as the first character of a line is in essence a comment, and is ignored.
I have added the following to my Spring boot application to trigger the SSE connection close()
Server Side:
On complete send an event of type complete via the SseEmitter.
@RestController
public class SearchController {
@Autowired
private SearchDelegate searchDelegate;
@GetMapping(value = "/{customerId}/search")
@ResponseStatus(HttpStatus.OK)
@ApiOperation(value = "Search Sources", notes = "Search Sources")
@ApiResponses(value = {
@ApiResponse(code = 201, message = "OK"),
@ApiResponse(code = 401, message = "Unauthorized")
})
@ResponseBody
public SseEmitter search(@ApiParam(name = "searchCriteria", value = "searchCriteria", required = true) @ModelAttribute @Valid final SearchCriteriaDto searchCriteriaDto) throws Exception {
return searchDelegate.route(searchCriteriaDto);
}
}
@Service
public class SearchDelegate {
public static final String SEARCH_EVENT_NAME = "SEARCH";
public static final String COMPLETE_EVENT_NAME = "COMPLETE";
public static final String COMPLETE_EVENT_DATA = "{\"name\": \"COMPLETED_STREAM\"}";
@Autowired
private SearchService searchService;
private ExecutorService executor = Executors.newCachedThreadPool();
public SseEmitter route(SearchCriteriaDto searchCriteriaDto) throws Exception {
SseEmitter emitter = new SseEmitter();
executor.execute(() -> {
try {
if(!searchCriteriaDto.getCustomerSources().isEmpty()) {
searchCriteriaDto.getCustomerSources().forEach(customerSource -> {
try {
SearchResponse searchResponse = searchService.search(searchCriteriaDto);
emitter.send(SseEmitter.event()
.id(customerSource.getSourceId())
.name(SEARCH_EVENT_NAME)
.data(searchResponse));
} catch (Exception e) {
log.error("Error while executing query for customer {} with source {}, Caused by {}",
customerId, source.getType(), e.getMessage());
}
});
}else {
log.debug("No available customerSources for the specified customer");
}
emitter.send(SseEmitter.event().
id(String.valueOf(System.currentTimeMillis()))
.name(COMPLETE_EVENT_NAME)
.data(COMPLETE_EVENT_DATA));
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
}
Client Side:
name
of event on our SseEmitter
, an event will be dispatched on the browser to the listener for the specified event name; the website source code should use addEventListener()
to listen for named events. (Notice: The onmessage
handler is called if no event name is specified for a message)EventSource
on the COMPLETE
event to release the client connection. https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
var sse = new EventSource('http://localhost:8080/federation/api/customers/5d96348feb061d13f46aa6ce/search?nativeQuery=true&queryString=*&size=10&customerSources=1,2,3&start=0');
sse.addEventListener("SEARCH", function(evt) {
var data = JSON.parse(evt.data);
console.log(data);
});
sse.addEventListener("COMPLETE", function(evt) {
console.log(evt);
sse.close();
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With