Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava async task in Android

I am trying to implement an asynchronous task using RxJava in Android. I tried the following code and it didn't work. It executes on the UI thread. I am using the following version of RxAndroid 0.24.0.

try {
    Observable.just(someMethodWhichThrowsException())
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> onMergeComplete());
}
catch (IOException e) {
    e.printStackTrace();
}

However, the following works asynchronously for me.

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                someMethodWhichThrowsException();
            } catch (IOException e) {
                e.printStackTrace();
            }

            subscriber.onCompleted();
        }
    });
    observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();

I am trying to understand the following:

  1. What is the difference between them?
  2. What is the best practice while creating async tasks?
like image 410
muneikh Avatar asked Apr 16 '15 09:04

muneikh


2 Answers

  1. What is the difference between them?
Observable.just(someMethodWhichThrowsException())
    .subscribeOn(Schedulers.newThread())

This is equivalent to the following:

Object someResult = someMethodWhichThrowsException();
Observable.just(someResult)
    .subscribeOn(Schedulers.newThread())

As you can see this makes the synchronous method call first, then passes it to Observable.just to become an Observable.

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            ...
        }
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

This method, however, will run the code in the call block on subscription. You've told it you want to subscribe on a new thread (subscribeOn(Schedulers.newThread())), so the subscription happens on a new thread, and the code which gets run on subscription (the call block) gets run on that thread too. This is similar behaviour to calling Observable.defer.

  1. What is the best practice while creating async tasks?

Well, that's up to you and your desired behaviour. Sometimes you want the async code to begin running immediately (in which case you may want to cache it using one of the operators for that purpose). I'd definitely consider using the Async Utils library for this.

Other times, you'll want it to run only on subscription (which is the behaviour in the examples here) - for example if there are side-effects, or if you don't care when it's run and just want to use the built-ins to get something off the UI thread. Dan Lew mentions that Observable.defer is very handy for taking old code and getting it off the UI thread, during a conversion to Rx.

like image 91
Adam S Avatar answered Nov 09 '22 04:11

Adam S


Use Async.start() from RxJava Async Utils library. This will call the function you provide on another thread.

Example:

Observable<String> observable = Async.start(new Func0<String>() {
    @Override
    public String call() {
        try {
            return someMethodWhichThrowsException();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
});

As you note, checked exceptions must be wrapped into RuntimeExceptions.

See also https://github.com/ReactiveX/RxJava/wiki/Async-Operators#start

like image 44
clemp6r Avatar answered Nov 09 '22 06:11

clemp6r