Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait for async Observable to complete

Tags:

java

rx-java

I'm trying to build a sample using rxjava. The sample should orchestrate a ReactiveWareService and a ReactiveReviewService retruning a WareAndReview composite.

ReactiveWareService
        public Observable<Ware> findWares() {
        return Observable.from(wareService.findWares());
    }

ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency!

public Observable<Review> findReviewsByItem(final String item) {
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> {
    try {
        List<Review> reviews = reviewService.findReviewsByItem(item);
        reviews.forEach(observer::onNext);
        observer.onCompleted();
    } catch (Exception e) {
        observer.onError(e);
    }
}));
}

public List<WareAndReview> findWaresWithReviews() throws RuntimeException {
final List<WareAndReview> wareAndReviews = new ArrayList<>();

wareService.findWares()
    .map(WareAndReview::new)
.subscribe(wr -> {
        wareAndReviews.add(wr);
        //Async!!!!
        reviewService.findReviewsByItem(wr.getWare().getItem())
            .subscribe(wr::addReview,
                throwable -> System.out.println("Error while trying to find reviews for " + wr)
            );
    }
);

//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion!
try {
    Thread.sleep(3000);
} catch (InterruptedException e) {}

return wareAndReviews;
}

Given the fact I don't want to return an Observable, how can I wait for async Observable (findReviewsByItem) to complete?

like image 667
user3753094 Avatar asked Jun 18 '14 15:06

user3753094


People also ask

Does async await wait?

The await operator is used to wait for a Promise . It can only be used inside an async function within regular JavaScript code; however it can be used on its own with JavaScript modules.

Can I use async await with Observable?

You can use Observables with Promises and with async/await to benefit from the strengths of each of those tools.

How do I make async wait?

Inside an async function, you can use the await keyword before a call to a function that returns a promise. This makes the code wait at that point until the promise is settled, at which point the fulfilled value of the promise is treated as a return value, or the rejected value is thrown.

Can observables be awaited?

You have to pass a promise to await . Convert the observable's next event to a promise and await that.


1 Answers

Most of your example can be rewritten with standard RxJava operators that work together well:

public class Example {

    Scheduler scheduler = Schedulers.from(executor);

    public Observable<Review> findReviewsByItem(final String item) {
        return Observable.just(item)
               .subscribeOn(scheduler)
               .flatMapIterable(reviewService::findReviewsByItem);
    }
    public List<WareAndReview> findWaresWithReviews() {
        return wareService
               .findWares()
               .map(WareAndReview::new)
               .flatMap(wr -> {
                   return reviewService
                          .findReviewsByItem(wr.getWare().getItem())
                          .doOnNext(wr::addReview)
                          .lastOrDefault(null)
                          .map(v -> wr);
               })
               .toList()
               .toBlocking()
               .first();
    }
}

Whenever you want to compose services like this, think of flatMap first. You don't need to block for each sub-Observable but only at the very end with toBlocking() if really necessary.

like image 58
akarnokd Avatar answered Oct 10 '22 15:10

akarnokd