Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactor GroupedFlux - wait to complete

Having an async publisher like the one bellow, is there a way with Project Reactor to wait till the entire stream is finished processing?
Of course, without having to add a sleep for an unknown duration...

@Test
public void groupByPublishOn() throws InterruptedException {
    UnicastProcessor<Integer> processor = UnicastProcessor.create();

    List<Integer> results = new ArrayList<>();
    Flux<Flux<Integer>> groupPublisher = processor.publish(1)
                                                  .autoConnect()
                                                  .groupBy(i -> i % 2)
                                                  .map(group -> group.publishOn(Schedulers.parallel()));

    groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));

    List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
    input.forEach(processor::onNext);
    processor.onComplete();

    Thread.sleep(500);

    Assert.assertTrue(results.size() == input.size());
}
like image 980
MirceaG Avatar asked Feb 02 '18 13:02

MirceaG


2 Answers

You can replace these lines:

 groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));

with this

groupPublisher.log()
              .flatMap(g -> g.log()
                             .doOnNext(results::add)
              )
              .blockLast();

flatMap is a better pattern than subscribe-within-subscribe and will take care of subscribing to the group for you.

doOnNext takes care of the consuming side-effect (adding values to the collection), freeing you up from the need to perform that in the subscription.

blockLast() replaces the subscription, and rather than letting you provide handlers for the events it blocks until the completion (and returns the last emitted item, but you would already have taken care of that within doOnNext).

like image 103
Simon Baslé Avatar answered Oct 19 '22 16:10

Simon Baslé


The main problem to use blockLast() is that you will never release your pipeline if your operation are not able to finish.

What you need to do is get the Disposable and check if has finish the pipeline which means the boolean isDisposed it will return true.

Then it´s up to you to decide if you want to have a timeout, like the lazy count implementation :)

int count = 0;

@Test
public void checkIfItDisposable() throws InterruptedException {
    Disposable subscribe = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .map(number -> {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return number;
            }).subscribeOn(Schedulers.newElastic("1"))
            .subscribe();

    while (!subscribe.isDisposed() && count < 100) {
        Thread.sleep(400);
        count++;
        System.out.println("Waiting......");
    }
    System.out.println("It disposable:" + subscribe.isDisposed());

And in case you want to use blockLast, at least add a timeout

@Test
public void checkIfItDisposableBlocking() throws InterruptedException {
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .map(number -> {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return number;
            }).subscribeOn(Schedulers.newElastic("1"))
            .blockLast(Duration.of(60, ChronoUnit.SECONDS));
    System.out.println("It disposable");
}

You can see more Reactor examples here if you need more ides https://github.com/politrons/reactive

like image 2
paul Avatar answered Oct 19 '22 15:10

paul