Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to chain asynchronous operations using Java RX Observable?

Tags:

java

rx-java

I want to make a HTTP request repeatedly and act on result. I start with public Observable<NewsItem> fetchItems(NewsFeed feed). One request gets a few news items but I decided to flatten it.

The idea was to use Observable.interval() make the request multiple times, then combine resulting Observables into one.

       Observable
            .interval(timePerItem, TimeUnit.MILLISECONDS)
            .map(i -> feed)
            .map(feed -> fetchItems(feed))
            .subscribe(result -> System.out.println(result));

But the result is Observable<Observable<NewsItem>> not Observable<NewsItem>. How to marge them?

I have found the marge() operator (RX-Java doc: Marge). But it does not seem to fit the use case.

In previous version I used CompletableFuture<List<NewsItem>> fetchNewsItems() but I wasn't able to fit it into Observable chain.

like image 960
atok Avatar asked Apr 16 '26 18:04

atok


1 Answers

Not sure if I understand the problem, but aren't you just looking for flatMap?

Observable
    .interval(timePerItem, TimeUnit.MILLISECONDS)
    .flatMap(i -> fetchItems(feed))
    .subscribe(result -> System.out.println(result));
like image 193
Samuel Gruetter Avatar answered Apr 18 '26 08:04

Samuel Gruetter