Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Fetching Observables In Parallel

I need some help in implementing parallel asynchronous calls in RxJava. I have picked up a simple use case wherein the FIRST call fetches (rather searches) a list of products (Tile) to be displayed. The subsequent calls go out and fetch (A) REVIEWS and (B) PRODUCT IMAGES

After several attempts I got to this place.

 1    Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);  2    List<Tile> allTiles = new ArrayList<Tile>();  3    ClientResponse response = new ClientResponse();   4    searchTile.parallel(oTile -> {  5      return oTile.flatMap(t -> {  6        Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());  7        Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());   8        return Observable.zip(reviews, imageUrl, (r, u) -> {  9          t.setReviews(r); 10          t.setImageUrl(u);  11          return t; 12        });  13      }); 14    }).subscribe(e -> { 15      allTiles.add((Tile) e); 16    }); 

Line 1: goes out and fetches the product (Tile) to be displayed

Line 4: We take the list of the Observable and SHARD it to fetch reviews and imageUrls

Lie 6,7: Fetch the Observable review and Observable url

Line 8: Finally the 2 observables are zipped up to return an updated Observable

Line 15: finally line 15 collates all the individual products to be displayed in a collection which can be returned back to the calling layer

While the Observable has been sharded and in our tests run over 4 different threads; fetching of reviews and images seems to be one after another. I suspect that the zip step on line 8 is basically causing the sequential invocation of the the 2 observables (reviews and url).

enter image description here

Does this group have any suggestion to parallely fetch reiews and image urls. In essence the waterfall chart attached above should look more vertically stacked. The calls to reviews and images should be in parallel

thanks anand raman

like image 749
diduknow Avatar asked Oct 08 '14 04:10

diduknow


People also ask

How many times can each of onNext() onComplete() and onError() methods be called?

Exactly once delivery for final events: you are allowed to call either onComplete or onError at most one time. And you cannot call both onComplete and onError . The implementation of onNext , onError or onComplete MUST NOT throw exceptions.

Is RxJava asynchronous?

Usually, asynchronous code is non-blocking: You call a method that returns immediately, allowing your code to continue its execution. Once the result of your call is available, it is returned via a callback. RxJava is asynchronous, too.

What is onNext in RxJava?

onNext(): This method is called when a new item is emitted from the Observable. onError(): This method is called when an error occurs and the emission of data is not successfully completed. onComplete(): This method is called when the Observable has successfully completed emitting all items.


1 Answers

The parallel operator proved to be a problem for almost all use cases and does not do what most expect from it, so it was removed in the 1.0.0.rc.4 release: https://github.com/ReactiveX/RxJava/pull/1716

A good example of how to do this type of behavior and get parallel execution can be seen here.

In your example code it is unclear if searchServiceClient is synchronous or asynchronous. It affects how to solve the problem slightly as if it is already async no extra scheduling is needed. If synchronous extra scheduling is needed.

First here are some simple examples showing synchronous and asynchronous behavior:

import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers;  public class ParallelExecution {      public static void main(String[] args) {         System.out.println("------------ mergingAsync");         mergingAsync();         System.out.println("------------ mergingSync");         mergingSync();         System.out.println("------------ mergingSyncMadeAsync");         mergingSyncMadeAsync();         System.out.println("------------ flatMapExampleSync");         flatMapExampleSync();         System.out.println("------------ flatMapExampleAsync");         flatMapExampleAsync();         System.out.println("------------");     }      private static void mergingAsync() {         Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);     }      private static void mergingSync() {         // here you'll see the delay as each is executed synchronously         Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);     }      private static void mergingSyncMadeAsync() {         // if you have something synchronous and want to make it async, you can schedule it like this         // so here we see both executed concurrently         Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);     }      private static void flatMapExampleAsync() {         Observable.range(0, 5).flatMap(i -> {             return getDataAsync(i);         }).toBlocking().forEach(System.out::println);     }      private static void flatMapExampleSync() {         Observable.range(0, 5).flatMap(i -> {             return getDataSync(i);         }).toBlocking().forEach(System.out::println);     }      // artificial representations of IO work     static Observable<Integer> getDataAsync(int i) {         return getDataSync(i).subscribeOn(Schedulers.io());     }      static Observable<Integer> getDataSync(int i) {         return Observable.create((Subscriber<? super Integer> s) -> {             // simulate latency                 try {                     Thread.sleep(1000);                 } catch (Exception e) {                     e.printStackTrace();                 }                 s.onNext(i);                 s.onCompleted();             });     } } 

Following is an attempt at providing an example that more closely matches your code:

import java.util.List;  import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers;  public class ParallelExecutionExample {      public static void main(String[] args) {         final long startTime = System.currentTimeMillis();          Observable<Tile> searchTile = getSearchResults("search term")                 .doOnSubscribe(() -> logTime("Search started ", startTime))                 .doOnCompleted(() -> logTime("Search completed ", startTime));          Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {             Observable<Reviews> reviews = getSellerReviews(t.getSellerId())                     .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));             Observable<String> imageUrl = getProductImage(t.getProductId())                     .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));              return Observable.zip(reviews, imageUrl, (r, u) -> {                 return new TileResponse(t, r, u);             }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));         });          List<TileResponse> allTiles = populatedTiles.toList()                 .doOnCompleted(() -> logTime("All Tiles Completed ", startTime))                 .toBlocking().single();     }      private static Observable<Tile> getSearchResults(String string) {         return mockClient(new Tile(1), new Tile(2), new Tile(3));     }      private static Observable<Reviews> getSellerReviews(int id) {         return mockClient(new Reviews());     }      private static Observable<String> getProductImage(int id) {         return mockClient("image_" + id);     }      private static void logTime(String message, long startTime) {         System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");     }      private static <T> Observable<T> mockClient(T... ts) {         return Observable.create((Subscriber<? super T> s) -> {             // simulate latency                 try {                     Thread.sleep(1000);                 } catch (Exception e) {                 }                 for (T t : ts) {                     s.onNext(t);                 }                 s.onCompleted();             }).subscribeOn(Schedulers.io());         // note the use of subscribeOn to make an otherwise synchronous Observable async     }      public static class TileResponse {          public TileResponse(Tile t, Reviews r, String u) {             // store the values         }      }      public static class Tile {          private final int id;          public Tile(int i) {             this.id = i;         }          public int getSellerId() {             return id;         }          public int getProductId() {             return id;         }      }      public static class Reviews {      } } 

This outputs:

Search started  => 65ms Search completed  => 1094ms getProductImage[1] completed  => 2095ms getSellerReviews[2] completed  => 2095ms getProductImage[3] completed  => 2095ms zip[1] completed  => 2096ms zip[2] completed  => 2096ms getProductImage[2] completed  => 2096ms getSellerReviews[1] completed  => 2096ms zip[3] completed  => 2096ms All Tiles Completed  => 2097ms getSellerReviews[3] completed  => 2097ms 

I have made each IO call be simulated to take 1000ms so it is obvious where the latency is and that it is happening in parallel. It prints out the progress is makes in elapsed milliseconds.

The trick here is that flatMap merges async calls, so as long as the Observables being merged are async, they will all be executed concurrently.

If a call like getProductImage(t.getProductId()) was synchronous, it can be made asynchronous like this: getProductImage(t.getProductId()).subscribeOn(Schedulers.io).

Here is the important part of the above example without all the logging and boilerplate types:

    Observable<Tile> searchTile = getSearchResults("search term");;      Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {         Observable<Reviews> reviews = getSellerReviews(t.getSellerId());         Observable<String> imageUrl = getProductImage(t.getProductId());          return Observable.zip(reviews, imageUrl, (r, u) -> {             return new TileResponse(t, r, u);         });     });      List<TileResponse> allTiles = populatedTiles.toList()             .toBlocking().single(); 

I hope this helps.

like image 98
benjchristensen Avatar answered Sep 29 '22 11:09

benjchristensen