In my project I need to process objects in different threads. To manipulate stream's behaviour I create new observables to change their observeOn()
this way:
apiService.getObjects(token) // Retrofit
.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(o -> {
// process in Main Thread
})
.map(Observable::just) // create new one, to change thread for it
.observeOn(Schedulers.io())
.subscribe(o -> {
// process in the background thread
});
But I think in RxJava there is much more beautiful and efficient way to process one response in different threads. I tried to google it, but I didn't find anything.
Thanks,
Anton
In Rx, it's usually recommended to avoid side effects in 'do' blocks (which will only be executed if the stream gets subscribed on), and prefer subscribe code.
In your case you can leverage cache()
or publish()...connect()
, e.g:
query = apiService.getObjects(token)
.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.cache();
query.observeOn(AndroidSchedulers.mainThread())
.subscribe(o -> {
// process in Main Thread
})
query.observeOn(Schedulers.io())
.subscribe(o -> {
// process in the background thread
});
With publish()
instead of cache()
, the code is identical but you can decide when to fire your query by connecting the stream (you call query.connect()
after wiring up the 2 subscriptions).
If your subscription work is background computation, Schedulers.computation()
may be preferred over Schedulers.io()
.
Note that AFAICT your code will work just fine without the map(Observable::just)
line, as 'observeOn' statements only impact the stream further down (and not previous 'do' statements)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With