Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava - When to use Observable with create method

I was reading a tutorial:

http://code.tutsplus.com/tutorials/getting-started-with-reactivex-on-android--cms-24387

which concers RxAndroid in particular but it's pretty much the same as in RxJava. I am not sure that I understood the concept completely.

Below I have written a method and then a sample usage.

My question is: is this the right way to implement my functions so that I can run them on other threads asynchronously? They will in fact only return a created Observable running the real code, and handling errors and all that stuff.

Or is this wrong, then I'd like to know the correct way.

Observable<String> googleSomething(String text){
    return Observable.create(new Observable(){
        @Override
        public void call(Subscriber<? super String> subscriber) {
             try {
                String data = fetchData(text); // some normal method
                subscriber.onNext(data); // Emit the contents of the URL
                subscriber.onCompleted(); // Nothing more to emit
            } catch(Exception e) {
                subscriber.onError(e); // In case there are network errors
            }
        }
    });
}

googleSomething("hello world").subscribeOn(Schedulers.io()).observeOn(Schedulers.immediate()).subscribe(...)

Also is Schedulers.immediate() used in order to execute the subscriber code on the current thread? It says "Creates and returns a Scheduler that executes work immediately on the current thread." in javadoc, but I'm not sure.

like image 374
Greyshack Avatar asked Dec 10 '22 17:12

Greyshack


2 Answers

Unless you are more experienced and need a custom operator or want to bridge a legacy addListener/removeListener based API you should not start with create. There are several questions on StackOverflow which used create and was the source of trouble.

I'd prefer fromCallable which let's you generate a single value or throw an Exception thus no need for those lengthy defer + just sources.

Schedulers.immediate() executes its task immediately on the caller's thread, which is the io() thread in your example, not the main thread. Currently, there is no support for moving back the computation to the Java main thread as it requires blocking trampolining and usually a bad idea anyway.

like image 94
akarnokd Avatar answered Feb 21 '23 21:02

akarnokd


You should almost never use create(), especially not as a beginner. There are easier ways to create observables, and create() is difficult to implement correctly.

Most of the time, you can easily get around create() by using defer(). E.g., in this case you'd do:

Observable<String> googleSomething(String text) {
  return Observable.defer(new Func0<Observable<String>>() {
    @Override
    public Observable<String> call() {
      try {
        return Observable.just(fetchData(text));
      } catch (IOException e) {
        return Observable.error(e);
      }
    }
  });
}

If you're not using a checked exception, then you could even get rid of the try-catch. RxJava will automatically forward any RuntimeException to the onError() part of the subscriber.

like image 38
Dan Lew Avatar answered Feb 21 '23 21:02

Dan Lew