I have a Spring Flux application where at some point I need to execute some heavy task on the background, the caller (a HTTP request) does not need to wait until that task completes.
Without reactor, I would just probably use the Async annotation, executing that method on a different thread. With reactor, I am not sure if I should proceed with that approach or if there is already a built-in mechanism that allows me to accomplish this.
For example, given a Controller that accepts a Resource object:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
processor.run(r); // the caller should not wait for the resource to be processed
return repository.save(r);
}
And a Processor class:
@Async
void run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> result = webClient.get()
.retrieve()
.bodyToMono(String.class);
String response = result.block(); //block for now
}
The HTTP caller for /create
should not need to wait until the run
method completes.
For a brief, when we annotate a method of a bean @Async annotation, Spring will execute it in a separate thread and the caller of the method will not wait till the method is completed execution. We will be defining our own Service and using Spring Boot 2 in this example.
In Spring Async, Filters and Servlets are working synchronously, but Spring WebFlux supports full asynchronous communication. Spring WebFlux is compatible with wider ranges of Web/Application servers than Spring Async, like Netty, and Undertow.
both infrastructure will compete for the same job (for example, serving static resources, the mappings, etc) mixing both runtime models within the same container is not a good idea and is likely to perform badly or just not work at all.
Spring WebFlux is built on Project Reactor. Project Reactor is the implementation of Reactive Streams specification. Reactor provides two types: Mono: implements Publisher and returns 0 or 1 elements. Flux: implements Publisher and returns N elements.
If you are looking for the fire-and-forget pattern implementation, you could just subscribe your publisher
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
run(r).subscribe();
return repository.save(r);
}
Mono<Void> run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
return webClient.get()
.retrieve()
.bodyToMono(String.class)
.then();
}
If your publisher executes blocking operations it should be subscribed on other thread with elastic or parallel scheduler.
I did some testing, and I think even using subscribe()
as fire and forget will wait for request to complete before returning an answer to the webbrowser or REST-client (at least in my simple tests, it looks like that). So, you have to do the similar of @Async, create another thread:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
return processor.run(r)
.subscribeOn(Schedulers.elastic()) // put eveything above this line on another thread
.doOnNext(string -> repository.save(r)); // persist "r", not changing it, though
}
And a Processor class:
Mono<String> run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
return webClient.get()
.retrieve()
.bodyToMono(String.class);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With