Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compose Observables to avoid the given nested and dependent callbacks?

Tags:

In this blog, he gives this (copy/pasted the following code) example for the callback hell. However, there is no mention of how the issue can be eliminated by using Reactive Extensions.

So here F3 depends upon F1 completion and F4 and F5 depend upon F2 completion.

  1. Wondering what would be the functional equivalent in Rx.
  2. How to represent in Rx that F1, F2, F3, F4 and F5 should all be pulled asynchronously?

NOTE: I am currently trying to wrap my head around Rx so I didn't try solving this example before asking this question.

import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference;  public class CallbackB {      /**      * Demonstration of nested callbacks which then need to composes their responses together.      * <p>      * Various different approaches for composition can be done but eventually they end up relying upon      * synchronization techniques such as the CountDownLatch used here or converge on callback design      * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.      */     public static void run() throws Exception {         final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());         /* the following are used to synchronize and compose the asynchronous callbacks */         final CountDownLatch latch = new CountDownLatch(3);         final AtomicReference<String> f3Value = new AtomicReference<String>();         final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();         final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();          try {             // get f3 with dependent result from f1             executor.execute(new CallToRemoteServiceA(new Callback<String>() {                  @Override                 public void call(String f1) {                     executor.execute(new CallToRemoteServiceC(new Callback<String>() {                          @Override                         public void call(String f3) {                             // we have f1 and f3 now need to compose with others                             System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));                             // set to thread-safe variable accessible by external scope                              f3Value.set(f3);                             latch.countDown();                         }                      }, f1));                 }              }));              // get f4/f5 after dependency f2 completes              executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {                  @Override                 public void call(Integer f2) {                     executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {                          @Override                         public void call(Integer f4) {                             // we have f2 and f4 now need to compose with others                             System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));                             // set to thread-safe variable accessible by external scope                              f4Value.set(f4);                             latch.countDown();                         }                      }, f2));                     executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {                          @Override                         public void call(Integer f5) {                             // we have f2 and f5 now need to compose with others                             System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));                             // set to thread-safe variable accessible by external scope                              f5Value.set(f5);                             latch.countDown();                         }                      }, f2));                 }              }));              /* we must wait for all callbacks to complete */             latch.await();             System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));         } finally {             executor.shutdownNow();         }     }      public static void main(String[] args) {         try {             run();         } catch (Exception e) {             e.printStackTrace();         }     }      private static final class CallToRemoteServiceA implements Runnable {          private final Callback<String> callback;          private CallToRemoteServiceA(Callback<String> callback) {             this.callback = callback;         }          @Override         public void run() {             // simulate fetching data from remote service             try {                 Thread.sleep(100);             } catch (InterruptedException e) {                 e.printStackTrace();             }             callback.call("responseA");         }     }      private static final class CallToRemoteServiceB implements Runnable {          private final Callback<Integer> callback;          private CallToRemoteServiceB(Callback<Integer> callback) {             this.callback = callback;         }          @Override         public void run() {             // simulate fetching data from remote service             try {                 Thread.sleep(40);             } catch (InterruptedException e) {                 e.printStackTrace();             }             callback.call(100);         }     }      private static final class CallToRemoteServiceC implements Runnable {          private final Callback<String> callback;         private final String dependencyFromA;          private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {             this.callback = callback;             this.dependencyFromA = dependencyFromA;         }          @Override         public void run() {             // simulate fetching data from remote service             try {                 Thread.sleep(60);             } catch (InterruptedException e) {                 e.printStackTrace();             }             callback.call("responseB_" + dependencyFromA);         }     }      private static final class CallToRemoteServiceD implements Runnable {          private final Callback<Integer> callback;         private final Integer dependencyFromB;          private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {             this.callback = callback;             this.dependencyFromB = dependencyFromB;         }          @Override         public void run() {             // simulate fetching data from remote service             try {                 Thread.sleep(140);             } catch (InterruptedException e) {                 e.printStackTrace();             }             callback.call(40 + dependencyFromB);         }     }      private static final class CallToRemoteServiceE implements Runnable {          private final Callback<Integer> callback;         private final Integer dependencyFromB;          private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {             this.callback = callback;             this.dependencyFromB = dependencyFromB;         }          @Override         public void run() {             // simulate fetching data from remote service             try {                 Thread.sleep(55);             } catch (InterruptedException e) {                 e.printStackTrace();             }             callback.call(5000 + dependencyFromB);         }     }      private static interface Callback<T> {         public void call(T value);     } } 
like image 941
Aravind Yarram Avatar asked Feb 09 '15 03:02

Aravind Yarram


2 Answers

I'm the original author of the referenced blog post about callbacks and Java Futures. Here is an example of using flatMap, zip and merge to do service composition asynchronously.

It fetches a User object, then concurrently fetches Social and PersonalizedCatalog data, then for each Video from the PersonalizedCatalog concurrently fetches a Bookmark, Rating and Metadata, zips those together, and merges all of the responses into a progressive stream output as Server-Sent Events.

return getUser(userId).flatMap(user -> {     Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)             .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(                     video -> {                         Observable<Bookmark> bookmark = getBookmark(video);                         Observable<Rating> rating = getRatings(video);                         Observable<VideoMetadata> metadata = getVideoMetadata(video);                         return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));                     }));      Observable<Map<String, Object>> social = getSocial(user).map(s -> {         return s.getDataAsMap();     });      return Observable.merge(catalog, social); }).flatMap(data -> {     String json = SimpleJson.mapToJson(data);     return response.writeStringAndFlush("data: " + json + "\n"); }); 

This example can be seen in context of a functioning application at https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33

Since I can't possibly provide all of the information here you can also find an explanation in presentation form (with link to video) at https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.

like image 61
benjchristensen Avatar answered Sep 22 '22 19:09

benjchristensen


According to your code. Suppose that the remote call are done using Observable.

 Observable<Integer>  callRemoveServiceA()  { /* async call */  }  /* .... */  Observable<Integer>  callRemoveServiceE(Integer f2) { /* async call */  } 

What you want :

  • call serviceA then call serviceB with the result of serviceA
  • call serviceC then call serviceD and serviceE with the result of serviceC
  • with the result of serviceE and serviceD, build a new value
  • display the new value with the result of serviceB

With RxJava, you'll achieve this with this code :

Observable<Integer> f3 = callRemoveServiceA() // call serviceA             // call serviceB with the result of serviceA             .flatMap((f1) -> callRemoveServiceB(f1));    Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC                     // call serviceD and serviceE then build a new value                     .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5));  // compute the string to display from f3, and the f4, f5 pair f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5)             // display the value             .subscribe(System.out::println); 

the important part here is the use of flapMap and zip (or zipWith)

  • flapMap will transform a value into another Observable. This Observable here will be your new asynchronos call. ( http://reactivex.io/documentation/operators/flatmap.html )
  • zip will compose a new value from two different Observable. So you can build a new value with the result of two (or more) Obsevable ( http://reactivex.io/documentation/operators/zip.html )

You can get more info on flapMap here : When do you use map vs flatMap in RxJava?

like image 43
dwursteisen Avatar answered Sep 18 '22 19:09

dwursteisen