I am trying to replace my code with rx-java. (It is very small code.)
It is finished and it works.
But I want to know...
Below is my code that is api handling.
before
Random r = new Random();
boolean apiResult = r.nextBoolean(); // it represents api result. ex. {"result": true} or {"result": false}
if (apiResult == true) {
// do something
System.out.println("result:" + "success");
} else {
// do something
System.out.println("result:" + "failure");
}
after
Random r = new Random();
Observable<Boolean> apiResultStream = Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
// emit true or false
subscriber.onNext(r.nextBoolean());
}
}).cache(1);
// I used filter for split. Is it Rx style?
// success if true emitted.
Observable<Boolean> successStream = apiResultStream
.filter(aBoolean -> aBoolean == true); // here
// failure if false emitted.
Observable<Boolean> failureStream = apiResultStream
.filter(aBoolean -> aBoolean == false); // here
// success flow
successStream
.flatMap(aBoolean -> Observable.just("success"))
// and do something
.subscribe(aString -> System.out.println("result:" + aString));
// failure flow
failureStream
.flatMap(aBoolean -> Observable.just("failure"))
// and do something.
// I want to keep subscriber.
.subscribe(aString -> System.out.println("result:" + aString));
EDIT
I almost replaced. thanks for good comment.
(but I have a few non-replaced code. It have many callback and if statement.)
I want to avoid 'callback hell'.
The key is different result type between 'callSuccessApi' and 'callFailureApi'
before rx
// callback hell!
callApi(new Callback<Result>(){
@Override
public void success(Result result) {
if (result.Response == true) {
callSuccessApi(new Callback<ResultSuccess>(){
@Override
public void success(ResultSuccess result) {
// and more callbacks...
}
}
} else { // result.Response == false
callFailureApi(new Callback<ResultFailure>(){
@Override
public void success(ResultFailure result) {
// and more callbacks...
}
}
}
}
}
after with rx(avoid callback hell! Is it a good Rx style?)
// change 1st api to observable.(I changed other api to observable)
Observable<Result> apiResultStream = Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
callApi(new Callback<Result>(){
@Override
public void success(Result result) {
subscriber.onNext(result);
}
});
}
}).cache(1); // ensure same Observable<Result> for success and failure.
// I used filter for split. Is it Rx style?
// success if result.response == true.
Observable<ResultSuccess> successStream = apiResultStream
.filter(result -> result.response == true); // here
// failure if result.response == false.
Observable<ResultFailure> failureStream = apiResultStream
.filter(result -> result.response == false); // here
// success flow. callSuccessApi return Observable<ResultSuccess>
successStream
.flatMap(result -> callSuccessApi(result))
// and more api call with flatMap...
.subscribe(resultSuccessN -> System.out.println("result:" + resultSuccessN.toString()));
// failure flow. callFailureApi return Observable<ResultFailure>
failureStream
.flatMap(resultFailure -> callFailureApi(result))
// and more api call with flatMap...
.subscribe(resultFailureN -> System.out.println("result:" + resultFailureN.toString()));
sorry for my poor English and long question.
Updated My Code
I got 2 important information in this question.(thank you @Tomáš Dvořák, @Will
updated code
Observable<Result> apiResultStream = Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
callApi(new Callback<Result>() {
@Override
public void success(Result result) {
subscriber.onNext(result);
}
});
}
});
// In this case, I used 'if' for simply and cleanly.
apiResultStream
.subscribe(result -> {
if (result.response == true) {
callSuccessApi(); // this line looks like 'callback'. but I used this for simply and cleanly.
} else {
callFailureApi();
}
});
There are loads of ways of doing this and it really depends on your use case. In general I wouldn't want to split into 2 streams, as that makes your code less readable. Also, I'm not sure what benefit you get from the flatMap call. There's nothing wrong with doing if stuff within a map call.
Here are a few options:
1 - For adding logging (a bit like your print lines), I use doOnEach()
apiResultStream
.doOnEach(next -> {
if (next) logger.info("Logging true " + next);
else logger.info(Logging false " + next);
})
.subscribe(....
2 - The work you're doing is part of your stream, and you're going to want to do more work on the stream later - use map
apiResultStream
.map(next -> {
if (next) doSomeCallWithNextWhenTrue(next);
else doSomeCallwithNextWhenFalse(next);
})
.subscribe(...
3 - If this is work you want to do at the end of the pipeline - IE after all transformational or other stream like work has completed, then do it in the subscribe call.
apiResultStream
.subscribe(next -> {
if (next) doSomeCallWithNextWhenTrue(next);
else doSomeCallwithNextWhenFalse(next);
});
The problem is - with such a simple use case, it's difficult to suggest the best option, but I appreciate that in learning Rx, working out how to do conditional statements can seem confusing. In general, I just use map
or flatMap
when I'm calling another method that returns an Observable
and do my logic in there.
Update
Still not sure why you're splitting your streams. Unless you start getting clever with different threads, the first subscribe call is going to block the second which is probably not what you want. Also, if you don't call subscribe more than once, then you don't need the cache()
call.
There's nothing wrong with using an if statement
within an map
/ flatmap
/ subscribe
. Especially if it makes your code more readable.
I would do the following:
apiResultStream
.flatMap(result -> {
if (result.response == true) {
return callSuccessApi(result)
}
else {
return callFailureApi(result)
})
//Do any more calls you need
.subscribe(...
So much cleaner.
I'm a bit confused by your System.out.println
calls in subscribe. Is this there for debug or logging purposes? If so, just do that within the above flatMap in the if statement.
Hope this helps,
Will
To avoid the if/else and to not break the chain™, I like to use publish and merge to split and re-merge the stream:
apiResultStream
.publish(results ->
Observable.merge(
results.filter(result -> result.response == true)
.flatmap(result -> callSuccessApiObservable()),
results.filter(result -> result.response == false)
.flatmap(result -> callFailureApiObservable())
)
)
.subscribe(...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With