Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fire and forget with reactor

I have a method like below in my Spring boot app.

public Flux<Data> search(SearchRequest request) {
  Flux<Data> result = searchService.search(request);//this returns Flux<Data>
  Mono<List<Data>> listOfData = result.collectList();
//  doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
  return result;
}

//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
  //do some processing here
}

Currently, I'm using @Async annotated service class with doThisAsync, but don't know how to pass the List<Data>, because I don't want to call block. All I have is Mono<List<Data>>.

My main problem is how to process this Mono separately and the search method should return the Flux<Data>.

like image 422
Avinash Anand Avatar asked Aug 20 '19 03:08

Avinash Anand


2 Answers

1, If your fire-and-forget is already async returning Mono/Flux

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> doThisAsync(data).subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public Mono<Void> doThisAsync(List<Data> data) {
    //do some async/non-blocking processing here like calling WebClient
}

2, If your fire-and-forget does blocking I/O

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
                                              .subscribeOn(Schedulers.elastic())  // delegate to proper thread to not block main flow
                                              .subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public void doThisAsync(List<Data> data) {
    //do some blocking I/O on calling thread
}

Note that in both of the above cases you lose backpressure support. If the doAsyncThis slows down for some reason, then the data producer won't care and keep producing items. This is a natural consequence of the fire-and-foget mechanism.

like image 55
Martin Tarjányi Avatar answered Oct 05 '22 10:10

Martin Tarjányi


Have you considered running the processing in separate threads using publishOn like in the example below? This may not be exactly what you are asking for but allows you to continue with other matters while the processing of the results in the flux is done by one or more threads, four in my example, from a dedicated scheduler (theFourThreadScheduler).

    @Test
    public void processingInSeparateThreadTest() {
        final Scheduler theFourThreadScheduler = Schedulers.newParallel("FourThreads", 4);
        final Flux<String> theResultFlux = Flux.just("one", "two", "three", "four", "five", "six", "seven", "eight");

        theResultFlux.log()
            .collectList()
            .publishOn(theFourThreadScheduler)
            .subscribe(theStringList -> {
                doThisAsync(theStringList);
            });

        System.out.println("Subscribed to the result flux");

        for (int i = 0; i < 20; i++) {
            System.out.println("Waiting for completion: " + i);
            try {
                Thread.sleep(300);
            } catch (final InterruptedException theException) {
            }
        }
    }

    private void doThisAsync(final List<String> inStringList) {
        for (final String theString : inStringList) {
            System.out.println("Processing in doThisAsync: " + theString);
            try {
                Thread.sleep(500);
            } catch (final InterruptedException theException) {
            }
        }
    }

Running the example produce the following output, showing that the processing performed in doThisAsync() is performed in the background.

Subscribed to the result flux
Waiting for completion: 0
Processing in doThisAsync: one
Waiting for completion: 1
Processing in doThisAsync: two
Waiting for completion: 2
Waiting for completion: 3
Processing in doThisAsync: three
Waiting for completion: 4
Waiting for completion: 5
Processing in doThisAsync: four
Waiting for completion: 6
Processing in doThisAsync: five
Waiting for completion: 7
Waiting for completion: 8
Processing in doThisAsync: six
Waiting for completion: 9
Processing in doThisAsync: seven
Waiting for completion: 10
Waiting for completion: 11
Processing in doThisAsync: eight
Waiting for completion: 12
Waiting for completion: 13
Waiting for completion: 14
Waiting for completion: 15
Waiting for completion: 16
Waiting for completion: 17
Waiting for completion: 18
Waiting for completion: 19

References: Reactor 3 Reference: Schedulers

like image 33
Ivan Krizsan Avatar answered Oct 05 '22 10:10

Ivan Krizsan