I'm trying to use Flux.buffer()
to batch up loads from a database.
The use case is that loading records from a DB may be 'bursty', and I'd like to introduce a small buffer to group together loads where possible.
My conceptual approach has been to use some form of processor, publish to it's sink, let that buffer, and then subscribe & filter for the result I want.
I've tried multiple different approaches (different types of processors, creating the filtered Mono in different ways).
Below is where I've gotten so far - largely by stumbling.
Currently, this returns a single result, but subsequent calls are dropped (though I'm unsure of where).
class BatchLoadingRepository { // I've tried all manner of different processors here. I'm unsure if // TopicProcessor is the correct one to use. private val bufferPublisher = TopicProcessor.create<String>() private val resultsStream = bufferPublisher .bufferTimeout(50, Duration.ofMillis(50)) // I'm unsure if concatMapIterable is the correct operator here, // but it seems to work. // I'm really trying to turn the List<MyEntity> // into a stream of MyEntity, published on the Flux<> .concatMapIterable { requestedIds -> // this is a Spring Data repository. It returns List<MyEntity> repository.findAllById(requestedIds) } // Multiple callers will invoke this method, and then subscribe to receive // their entity back. fun findByIdAsync(id: String): Mono<MyEntity> { // Is there a potential race condition here, caused by a result // on the resultsStream, before I've subscribed? return Mono.create<MyEntity> { sink -> bufferPublisher.sink().next(id) resultsStream.filter { it.id == id } .subscribe { next -> sink.success(next) } } } }
Subscribe to this Flux and block until the upstream signals its first value, completes or a timeout expires. Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes.
Instead of take(1) , you could use next() . This will transform the Flux into a valued Mono by taking the first emitted item, or an empty Mono if the Flux is empty itself.
You can use hasElements method of Flux to check whether Flux completes empty. It emits a single boolean true if this Flux sequence has at least one element.
How to extract data from Flux in Java? Another way would be using the Reactive Streams operators like onNext, flatMap, etc. In the below example, we are using the onNext() method to get data from Flux and print it. Note: We need to subscribe to the Publisher.
Hi i was testing your code and i think the best way is to use EmitterProcessor shared. I did a test with emitterProcessor and it seems to work.
Flux<String> fluxi; EmitterProcessor emitterProcessor; @Override public void run(String... args) throws Exception { emitterProcessor = EmitterProcessor.create(); fluxi = emitterProcessor.share().bufferTimeout(500, Duration.ofMillis(500)) .concatMapIterable(o -> o); Flux.range(0,1000) .flatMap(integer -> findByIdAsync(integer.toString())) .map(s -> { System.out.println(s); return s; }).subscribe(); } private Mono<String> findByIdAsync(String id) { return Mono.create(monoSink -> { fluxi.filter(s -> s == id).subscribe(value -> monoSink.success(value)); emitterProcessor.onNext(id); }); }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With