Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Running a Mono in background while returning a response when using Spring Webflux

This questions is related to Return immediately in spring web flux but I don't think it's the same (at least the answer there is not satisfactory for me).

I have a function returning a Mono that when invoked starts a long-running job. This function is invoked when a call is made to a Spring Webflux HTTP API. Here's an example:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
    val longRunningJob : Mono<Job> = startNewJob(jobId)
    longRunningJob.map { job ->
        val jobUri = generateJobUri(request, job.id)
        ResponseEntity.created(jobURI).build<Unit>()
    }
}

The problem with the code above is that "201 Created" is created after the long running job is completed. I want to kick-off the longRunningJob in the background and return "201 Created" immediately.

I could perhaps do something like this:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {

    startNewJob(jobId)
        .subscribeOn(Schedulers.newSingle("thread"))
        .subscribe()

    val jobUri = generateJobUri(request, job.id)
    val response = ResponseEntity.created(jobURI).build<Unit>()
    Mono.just(response)
}

But it doesn't seem very idiomatic to me to have to call subscribe() manually (e.g. intellij is complaining that I call subscribe() in non-blocking scope). Isn't there a better way to compose the two "streams" without using an explicit subscribe? If so how do I modify the startNewJob function above to achieve this?

like image 499
Johan Avatar asked Apr 09 '19 13:04

Johan


People also ask

How do I return Monos and fluxes in spring webflux?

In Spring WebFlux, we call reactive APIs/functions that return monos and fluxes and your controllers will return monos and fluxes. When you invoke an API that returns a mono or a flux, it will return immediately.

Does spring webflux support Reactive Streams?

Join the DZone community and get the full member experience. The reactive-stack web framework, Spring WebFlux, has been added to Spring 5.0. It is fully non-blocking, supports reactive streams back-pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.

How to use spring monowebflux with Maven repository?

Run the Spring MonoWebFlux Application in order to see if all works good. If not it is possible not to have the Maven Repository OK and you must run the following commands: Once the project is running without errors add the code to get the Spring MonoWebFlux Application main class like this:

Can I use Spring MVC with webflux?

If you are familiar with Spring MVC programming style, you can easily work on webflux also. Spring webflux uses project reactor as reactive library. Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back pressure.


1 Answers

AFAIK, using one of the subscribe methods is the only way to really start a job in the background with its own lifecycle (not tied to the returned publisher).

If you were to use one of the operators to combine the job publisher and the response publisher (e.g. zip or merge), then the lifecycle of the job publisher would be tied to the response publisher, which is not what you want for a background job.

One thing you might want to consider is kicking off the background job within the response publisher stream, rather than directly in the method body. e.g. via doOnSubscibe or from an operator upstream of the response.

This would tie the start of the background job to the onSubscribe events of the response publisher, but still allow it to complete in the background.

Also note, that if you want to be able to cancel the background job (e.g. maybe during application shutdown), you'll need to save the Disposable returned from subscribe so you can later call dispose on it. This might be better done from some type of BackgroundJobManager that could keep track of all the jobs running.

like image 110
Phil Clay Avatar answered Oct 03 '22 11:10

Phil Clay