I am working on a Spring Webflux project and have encountered an issue trying to publish and consume Flux's in a Scheduled Task.
@Scheduled(fixedRate = 20*1000)
fun updateNews() {
try {
logger.info("Automatic Update at: ${LocalDateTime.now()}")
articleRepository.saveAll(
sourceRepository.findAll().publishOn(Schedulers.parallel())
.map { source -> source.generate() }
.flatMap { it?.read() ?: Flux.empty() })
.timeout(Duration.ofSeconds(20)
).subscribeOn(Schedulers.parallel())
} catch(e: Throwable) {
logger.log(Level.SEVERE, "Error in Scheduler", e)
}
}
My configured Scheduler:
ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))
This task will never completed unless I intentionally block on the end:
.then().block()
I originally did not bother with the direct reference to publish/subscribe to the Schedulers, and I have tried all of the options that seemed reasonable to no effect.
My log event occurs, but it seems as though when the thread for this task from the scheduler dies, the flux's are trashes as well; even though they should be in their own thread pool once I have specified the publishOn and subscribeOn behavior?
I would like to make this action completely reactive, any advice would be appreciated.
@Scheduled
isn't integrated with Flux
, so it wouldn't know what to do with the Flux
should you return it. Combine that with the fact that in Reactor (and Reactive Streams in general), nothing generally happens until you subscribe()
, and you can start to see what went wrong.
block()
is actually a form of subscribe()
, that's why it works once you add it to the code. it is actually probably the best option here since you need to bridge a reactive piece of code (from the ReactiveRepository
) into the imperative blocking world (your @Scheduled fun
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With