In this blog, he gives this (copy/pasted the following code) example for the callback hell. However, there is no mention of how the issue can be eliminated by using Reactive Extensions.
So here F3 depends upon F1 completion and F4 and F5 depend upon F2 completion.
NOTE: I am currently trying to wrap my head around Rx so I didn't try solving this example before asking this question.
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class CallbackB { /** * Demonstration of nested callbacks which then need to composes their responses together. * <p> * Various different approaches for composition can be done but eventually they end up relying upon * synchronization techniques such as the CountDownLatch used here or converge on callback design * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>. */ public static void run() throws Exception { final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); /* the following are used to synchronize and compose the asynchronous callbacks */ final CountDownLatch latch = new CountDownLatch(3); final AtomicReference<String> f3Value = new AtomicReference<String>(); final AtomicReference<Integer> f4Value = new AtomicReference<Integer>(); final AtomicReference<Integer> f5Value = new AtomicReference<Integer>(); try { // get f3 with dependent result from f1 executor.execute(new CallToRemoteServiceA(new Callback<String>() { @Override public void call(String f1) { executor.execute(new CallToRemoteServiceC(new Callback<String>() { @Override public void call(String f3) { // we have f1 and f3 now need to compose with others System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5")); // set to thread-safe variable accessible by external scope f3Value.set(f3); latch.countDown(); } }, f1)); } })); // get f4/f5 after dependency f2 completes executor.execute(new CallToRemoteServiceB(new Callback<Integer>() { @Override public void call(Integer f2) { executor.execute(new CallToRemoteServiceD(new Callback<Integer>() { @Override public void call(Integer f4) { // we have f2 and f4 now need to compose with others System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5")); // set to thread-safe variable accessible by external scope f4Value.set(f4); latch.countDown(); } }, f2)); executor.execute(new CallToRemoteServiceE(new Callback<Integer>() { @Override public void call(Integer f5) { // we have f2 and f5 now need to compose with others System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5)); // set to thread-safe variable accessible by external scope f5Value.set(f5); latch.countDown(); } }, f2)); } })); /* we must wait for all callbacks to complete */ latch.await(); System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get())); } finally { executor.shutdownNow(); } } public static void main(String[] args) { try { run(); } catch (Exception e) { e.printStackTrace(); } } private static final class CallToRemoteServiceA implements Runnable { private final Callback<String> callback; private CallToRemoteServiceA(Callback<String> callback) { this.callback = callback; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } callback.call("responseA"); } } private static final class CallToRemoteServiceB implements Runnable { private final Callback<Integer> callback; private CallToRemoteServiceB(Callback<Integer> callback) { this.callback = callback; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(40); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(100); } } private static final class CallToRemoteServiceC implements Runnable { private final Callback<String> callback; private final String dependencyFromA; private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) { this.callback = callback; this.dependencyFromA = dependencyFromA; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); } callback.call("responseB_" + dependencyFromA); } } private static final class CallToRemoteServiceD implements Runnable { private final Callback<Integer> callback; private final Integer dependencyFromB; private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) { this.callback = callback; this.dependencyFromB = dependencyFromB; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(140); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(40 + dependencyFromB); } } private static final class CallToRemoteServiceE implements Runnable { private final Callback<Integer> callback; private final Integer dependencyFromB; private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) { this.callback = callback; this.dependencyFromB = dependencyFromB; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(55); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(5000 + dependencyFromB); } } private static interface Callback<T> { public void call(T value); } }
I'm the original author of the referenced blog post about callbacks and Java Futures. Here is an example of using flatMap, zip and merge to do service composition asynchronously.
It fetches a User object, then concurrently fetches Social and PersonalizedCatalog data, then for each Video from the PersonalizedCatalog concurrently fetches a Bookmark, Rating and Metadata, zips those together, and merges all of the responses into a progressive stream output as Server-Sent Events.
return getUser(userId).flatMap(user -> { Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user) .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap( video -> { Observable<Bookmark> bookmark = getBookmark(video); Observable<Rating> rating = getRatings(video); Observable<VideoMetadata> metadata = getVideoMetadata(video); return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m)); })); Observable<Map<String, Object>> social = getSocial(user).map(s -> { return s.getDataAsMap(); }); return Observable.merge(catalog, social); }).flatMap(data -> { String json = SimpleJson.mapToJson(data); return response.writeStringAndFlush("data: " + json + "\n"); });
This example can be seen in context of a functioning application at https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33
Since I can't possibly provide all of the information here you can also find an explanation in presentation form (with link to video) at https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.
According to your code. Suppose that the remote call are done using Observable
.
Observable<Integer> callRemoveServiceA() { /* async call */ } /* .... */ Observable<Integer> callRemoveServiceE(Integer f2) { /* async call */ }
What you want :
serviceA
then call serviceB
with the result of serviceA
serviceC
then call serviceD
and serviceE
with the result of serviceC
serviceE
and serviceD
, build a new valueserviceB
With RxJava, you'll achieve this with this code :
Observable<Integer> f3 = callRemoveServiceA() // call serviceA // call serviceB with the result of serviceA .flatMap((f1) -> callRemoveServiceB(f1)); Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC // call serviceD and serviceE then build a new value .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5)); // compute the string to display from f3, and the f4, f5 pair f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5) // display the value .subscribe(System.out::println);
the important part here is the use of flapMap
and zip
(or zipWith
)
flapMap
will transform a value into another Observable
. This Observable
here will be your new asynchronos call. ( http://reactivex.io/documentation/operators/flatmap.html )zip
will compose a new value from two different Observable
. So you can build a new value with the result of two (or more) Obsevable
( http://reactivex.io/documentation/operators/zip.html )You can get more info on flapMap here : When do you use map vs flatMap in RxJava?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With