This questions is related to Return immediately in spring web flux but I don't think it's the same (at least the answer there is not satisfactory for me).
I have a function returning a Mono
that when invoked starts a long-running job. This function is invoked when a call is made to a Spring Webflux HTTP API. Here's an example:
@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
val longRunningJob : Mono<Job> = startNewJob(jobId)
longRunningJob.map { job ->
val jobUri = generateJobUri(request, job.id)
ResponseEntity.created(jobURI).build<Unit>()
}
}
The problem with the code above is that "201 Created" is created after the long running job is completed. I want to kick-off the longRunningJob
in the background and return "201 Created" immediately.
I could perhaps do something like this:
@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
startNewJob(jobId)
.subscribeOn(Schedulers.newSingle("thread"))
.subscribe()
val jobUri = generateJobUri(request, job.id)
val response = ResponseEntity.created(jobURI).build<Unit>()
Mono.just(response)
}
But it doesn't seem very idiomatic to me to have to call subscribe()
manually (e.g. intellij is complaining that I call subscribe()
in non-blocking scope). Isn't there a better way to compose the two "streams" without using an explicit subscribe
? If so how do I modify the startNewJob
function above to achieve this?
In Spring WebFlux, we call reactive APIs/functions that return monos and fluxes and your controllers will return monos and fluxes. When you invoke an API that returns a mono or a flux, it will return immediately.
Join the DZone community and get the full member experience. The reactive-stack web framework, Spring WebFlux, has been added to Spring 5.0. It is fully non-blocking, supports reactive streams back-pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.
Run the Spring MonoWebFlux Application in order to see if all works good. If not it is possible not to have the Maven Repository OK and you must run the following commands: Once the project is running without errors add the code to get the Spring MonoWebFlux Application main class like this:
If you are familiar with Spring MVC programming style, you can easily work on webflux also. Spring webflux uses project reactor as reactive library. Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back pressure.
AFAIK, using one of the subscribe
methods is the only way to really start a job in the background with its own lifecycle (not tied to the returned publisher).
If you were to use one of the operators to combine the job publisher and the response publisher (e.g. zip
or merge
), then the lifecycle of the job publisher would be tied to the response publisher, which is not what you want for a background job.
One thing you might want to consider is kicking off the background job within the response publisher stream, rather than directly in the method body. e.g. via doOnSubscibe
or from an operator upstream of the response.
This would tie the start of the background job to the onSubscribe events of the response publisher, but still allow it to complete in the background.
Also note, that if you want to be able to cancel the background job (e.g. maybe during application shutdown), you'll need to save the Disposable
returned from subscribe
so you can later call dispose
on it. This might be better done from some type of BackgroundJobManager that could keep track of all the jobs running.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With