Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using reactor's Flux.buffer to batch work only works for single item

Tags:

I'm trying to use Flux.buffer() to batch up loads from a database.

The use case is that loading records from a DB may be 'bursty', and I'd like to introduce a small buffer to group together loads where possible.

My conceptual approach has been to use some form of processor, publish to it's sink, let that buffer, and then subscribe & filter for the result I want.

I've tried multiple different approaches (different types of processors, creating the filtered Mono in different ways).

Below is where I've gotten so far - largely by stumbling.

Currently, this returns a single result, but subsequent calls are dropped (though I'm unsure of where).

class BatchLoadingRepository {     // I've tried all manner of different processors here.  I'm unsure if     // TopicProcessor is the correct one to use.     private val bufferPublisher = TopicProcessor.create<String>()     private val resultsStream = bufferPublisher             .bufferTimeout(50, Duration.ofMillis(50))             // I'm unsure if concatMapIterable is the correct operator here,              // but it seems to work.             // I'm really trying to turn the List<MyEntity>              // into a stream of MyEntity, published on the Flux<>             .concatMapIterable { requestedIds ->                 // this is a Spring Data repository.  It returns List<MyEntity>                 repository.findAllById(requestedIds)             }      // Multiple callers will invoke this method, and then subscribe to receive     // their entity back.     fun findByIdAsync(id: String): Mono<MyEntity> {          // Is there a potential race condition here, caused by a result         // on the resultsStream, before I've subscribed?         return Mono.create<MyEntity> { sink ->             bufferPublisher.sink().next(id)             resultsStream.filter { it.id == id }                     .subscribe { next ->                         sink.success(next)                     }         }     } } 
like image 492
Marty Pitt Avatar asked Mar 15 '19 17:03

Marty Pitt


People also ask

Does flux subscribe block?

Subscribe to this Flux and block until the upstream signals its first value, completes or a timeout expires. Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes.

How do you convert flux objects to Mono?

Instead of take(1) , you could use next() . This will transform the Flux into a valued Mono by taking the first emitted item, or an empty Mono if the Flux is empty itself.

How do I know if my flux is empty?

You can use hasElements method of Flux to check whether Flux completes empty. It emits a single boolean true if this Flux sequence has at least one element.

How do you get data from flux?

How to extract data from Flux in Java? Another way would be using the Reactive Streams operators like onNext, flatMap, etc. In the below example, we are using the onNext() method to get data from Flux and print it. Note: We need to subscribe to the Publisher.


1 Answers

Hi i was testing your code and i think the best way is to use EmitterProcessor shared. I did a test with emitterProcessor and it seems to work.

Flux<String> fluxi; EmitterProcessor emitterProcessor;  @Override public void run(String... args) throws Exception {     emitterProcessor = EmitterProcessor.create();      fluxi = emitterProcessor.share().bufferTimeout(500, Duration.ofMillis(500))             .concatMapIterable(o -> o);      Flux.range(0,1000)             .flatMap(integer -> findByIdAsync(integer.toString()))             .map(s -> {                 System.out.println(s);                 return s;             }).subscribe();  }  private Mono<String> findByIdAsync(String id) {     return Mono.create(monoSink -> {         fluxi.filter(s -> s == id).subscribe(value -> monoSink.success(value));         emitterProcessor.onNext(id);     }); } 
like image 114
Ricard Kollcaku Avatar answered Oct 15 '22 08:10

Ricard Kollcaku