Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient way to manipulate threads RxJava

In my project I need to process objects in different threads. To manipulate stream's behaviour I create new observables to change their observeOn() this way:

apiService.getObjects(token) // Retrofit
                .compose(bindToLifecycle())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(o -> {
                    // process in Main Thread
                })
                .map(Observable::just)  // create new one, to change thread for it
                .observeOn(Schedulers.io()) 
                .subscribe(o -> {
                    // process in the background thread
                });

But I think in RxJava there is much more beautiful and efficient way to process one response in different threads. I tried to google it, but I didn't find anything.

Thanks,
Anton

like image 780
Anton Shkurenko Avatar asked Oct 20 '22 01:10

Anton Shkurenko


1 Answers

In Rx, it's usually recommended to avoid side effects in 'do' blocks (which will only be executed if the stream gets subscribed on), and prefer subscribe code.

In your case you can leverage cache() or publish()...connect(), e.g:

query = apiService.getObjects(token)
            .compose(bindToLifecycle())
            .subscribeOn(Schedulers.io())
            .cache();

query.observeOn(AndroidSchedulers.mainThread())
            .subscribe(o -> {
                // process in Main Thread
            })
query.observeOn(Schedulers.io())
            .subscribe(o -> {
                // process in the background thread
            });

With publish() instead of cache(), the code is identical but you can decide when to fire your query by connecting the stream (you call query.connect() after wiring up the 2 subscriptions).

If your subscription work is background computation, Schedulers.computation() may be preferred over Schedulers.io().

Note that AFAICT your code will work just fine without the map(Observable::just) line, as 'observeOn' statements only impact the stream further down (and not previous 'do' statements)

like image 54
Gluck Avatar answered Oct 23 '22 09:10

Gluck