I'm learning RxJava and, as my first experiment, trying to rewrite the code in the first run()
method in this code (cited on Netflix's blog as a problem RxJava can help solve) to improve its asynchronicity using RxJava, i.e. so it doesn't wait for the result of the first Future (f1.get()
) before proceeding on to the rest of the code.
f3
depends on f1
. I see how to handle this, flatMap
seems to do the trick:
Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
}
});
Next, f4
and f5
depend on f2
. I have this:
final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer i) {
Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
return Observable.merge(f4Observable, f5Observable);
}
});
Which starts to get weird (merge
ing them probably isn't what I want...) but allows me to do this at the end, not quite what I want:
f3Observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Observed from f3: " + s);
f4And5Observable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("Observed from f4 and f5: " + i);
}
});
}
});
That gives me:
Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100
which is all the numbers, but unfortunately I get the results in separate invocations, so I can't quite replace the final println in the original code:
System.out.println(f3.get() + " => " + (f4.get() * f5.get()));
I don't understand how to get access to both those return values on the same line. I think there's probably some functional programming fu I'm missing here. How can I do this? Thanks.
It looks like all you really need is a bit more encouragement and perspective on how RX is used. I'd suggest you read more into the documentation as well as marble diagrams (I know they're not always useful). I also suggest looking into the lift()
function and operators.
map
, flatMap
and filter
are to manipulate the data in your data flowThe point of operators are to allow you to disrupt a steady stream of observables and define your own operations on a data flow. For example, I coded a moving average operator. That sums up n
double
s in an Observable of doubles to return a stream of moving averages. The code literally looked like this
Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))
You'll be a relieved that a lot of the filtering methods that you take for granted all have lift()
under the hood.
With that said; all it takes to merge multiple dependencies is:
map
or flatMap
Edit: someone converted the following text, which I had added as an edit on the question, into an answer, which I appreciate, and understand may be the proper SO thing to do, however I do not consider this an answer because it's clearly not the right way to do it. I would not ever use this code nor would I advise anyone to copy it. Other/better solutions and comments welcome!
I was able to solve this with the following. I didn't realize you could flatMap
an observable more than once, I assumed results could only be consumed once. So I just flatMap
f2Observable twice (sorry, I renamed some stuff in the code since my original post), then zip
on all the Observables, then subscribe to that. That Map
in the zip
to aggregate the values is undesirable because of the type juggling. Other/better solutions and comments welcome! The full code is viewable in a gist. Thank you.
Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("Observed from f2: " + integer);
Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
return Observable.from(f4);
}
});
Observable<Integer> f5Observable = f2Observable
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("Observed from f2: " + integer);
Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
return Observable.from(f5);
}
});
Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
@Override
public Map<String, String> call(String s, Integer integer, Integer integer2) {
Map<String, String> map = new HashMap<String, String>();
map.put("f3", s);
map.put("f4", String.valueOf(integer));
map.put("f5", String.valueOf(integer2));
return map;
}
}).subscribe(new Action1<Map<String, String>>() {
@Override
public void call(Map<String, String> map) {
System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
}
});
And this yields me the desired output:
responseB_responseA => 714000
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With