I was reading a tutorial:
http://code.tutsplus.com/tutorials/getting-started-with-reactivex-on-android--cms-24387
which concers RxAndroid in particular but it's pretty much the same as in RxJava. I am not sure that I understood the concept completely.
Below I have written a method and then a sample usage.
My question is: is this the right way to implement my functions so that I can run them on other threads asynchronously? They will in fact only return a created Observable running the real code, and handling errors and all that stuff.
Or is this wrong, then I'd like to know the correct way.
Observable<String> googleSomething(String text){
return Observable.create(new Observable(){
@Override
public void call(Subscriber<? super String> subscriber) {
try {
String data = fetchData(text); // some normal method
subscriber.onNext(data); // Emit the contents of the URL
subscriber.onCompleted(); // Nothing more to emit
} catch(Exception e) {
subscriber.onError(e); // In case there are network errors
}
}
});
}
googleSomething("hello world").subscribeOn(Schedulers.io()).observeOn(Schedulers.immediate()).subscribe(...)
Also is Schedulers.immediate() used in order to execute the subscriber code on the current thread? It says "Creates and returns a Scheduler that executes work immediately on the current thread." in javadoc, but I'm not sure.
Unless you are more experienced and need a custom operator or want to bridge a legacy addListener/removeListener based API you should not start with create
. There are several questions on StackOverflow which used create
and was the source of trouble.
I'd prefer fromCallable
which let's you generate a single value or throw an Exception thus no need for those lengthy defer
+ just
sources.
Schedulers.immediate()
executes its task immediately on the caller's thread, which is the io() thread in your example, not the main thread. Currently, there is no support for moving back the computation to the Java main thread as it requires blocking trampolining and usually a bad idea anyway.
You should almost never use create()
, especially not as a beginner. There are easier ways to create observables, and create()
is difficult to implement correctly.
Most of the time, you can easily get around create()
by using defer()
. E.g., in this case you'd do:
Observable<String> googleSomething(String text) {
return Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
try {
return Observable.just(fetchData(text));
} catch (IOException e) {
return Observable.error(e);
}
}
});
}
If you're not using a checked exception, then you could even get rid of the try-catch. RxJava will automatically forward any RuntimeException
to the onError()
part of the subscriber.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With