Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Working with Flux in a Scheduled Task

I am working on a Spring Webflux project and have encountered an issue trying to publish and consume Flux's in a Scheduled Task.

@Scheduled(fixedRate = 20*1000)
fun updateNews() {
    try {
        logger.info("Automatic Update at: ${LocalDateTime.now()}")
        articleRepository.saveAll(
                sourceRepository.findAll().publishOn(Schedulers.parallel())
                        .map { source -> source.generate() }
                        .flatMap { it?.read() ?: Flux.empty() })
                        .timeout(Duration.ofSeconds(20)
        ).subscribeOn(Schedulers.parallel())
    } catch(e: Throwable) {
        logger.log(Level.SEVERE, "Error in Scheduler", e)
    }
}

My configured Scheduler:

ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))

This task will never completed unless I intentionally block on the end:

.then().block()

I originally did not bother with the direct reference to publish/subscribe to the Schedulers, and I have tried all of the options that seemed reasonable to no effect.

My log event occurs, but it seems as though when the thread for this task from the scheduler dies, the flux's are trashes as well; even though they should be in their own thread pool once I have specified the publishOn and subscribeOn behavior?

I would like to make this action completely reactive, any advice would be appreciated.

like image 516
dillius Avatar asked Mar 07 '23 19:03

dillius


1 Answers

@Scheduled isn't integrated with Flux, so it wouldn't know what to do with the Flux should you return it. Combine that with the fact that in Reactor (and Reactive Streams in general), nothing generally happens until you subscribe(), and you can start to see what went wrong.

block() is actually a form of subscribe(), that's why it works once you add it to the code. it is actually probably the best option here since you need to bridge a reactive piece of code (from the ReactiveRepository) into the imperative blocking world (your @Scheduled fun).

like image 154
Simon Baslé Avatar answered Mar 27 '23 13:03

Simon Baslé