Having an async publisher like the one bellow, is there a way with Project Reactor to wait till the entire stream is finished processing?
Of course, without having to add a sleep for an unknown duration...
@Test
public void groupByPublishOn() throws InterruptedException {
UnicastProcessor<Integer> processor = UnicastProcessor.create();
List<Integer> results = new ArrayList<>();
Flux<Flux<Integer>> groupPublisher = processor.publish(1)
.autoConnect()
.groupBy(i -> i % 2)
.map(group -> group.publishOn(Schedulers.parallel()));
groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));
List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
input.forEach(processor::onNext);
processor.onComplete();
Thread.sleep(500);
Assert.assertTrue(results.size() == input.size());
}
You can replace these lines:
groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));
with this
groupPublisher.log()
.flatMap(g -> g.log()
.doOnNext(results::add)
)
.blockLast();
flatMap
is a better pattern than subscribe-within-subscribe and will take care of subscribing to the group for you.
doOnNext
takes care of the consuming side-effect (adding values to the collection), freeing you up from the need to perform that in the subscription.
blockLast()
replaces the subscription, and rather than letting you provide handlers for the events it blocks until the completion (and returns the last emitted item, but you would already have taken care of that within doOnNext).
The main problem to use blockLast() is that you will never release your pipeline if your operation are not able to finish.
What you need to do is get the Disposable and check if has finish the pipeline which means the boolean isDisposed it will return true.
Then it´s up to you to decide if you want to have a timeout, like the lazy count implementation :)
int count = 0;
@Test
public void checkIfItDisposable() throws InterruptedException {
Disposable subscribe = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(number -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
}).subscribeOn(Schedulers.newElastic("1"))
.subscribe();
while (!subscribe.isDisposed() && count < 100) {
Thread.sleep(400);
count++;
System.out.println("Waiting......");
}
System.out.println("It disposable:" + subscribe.isDisposed());
And in case you want to use blockLast, at least add a timeout
@Test
public void checkIfItDisposableBlocking() throws InterruptedException {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(number -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
}).subscribeOn(Schedulers.newElastic("1"))
.blockLast(Duration.of(60, ChronoUnit.SECONDS));
System.out.println("It disposable");
}
You can see more Reactor examples here if you need more ides https://github.com/politrons/reactive
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With