Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to observeOn the calling thread in java rx?

is there anyway to tell java rx to use the current thread in the observeOn function? I am writing code for the android syncadapter and I want the results be observed in the sync adapter thread and not in the main thread.

An example network call with Retrofit + RX Java looks something like that:

MyRetrofitApi.getInstance().getObjects()
.subscribeOn(Schedulers.io())
.observeOn(<current_thread>)
.subscribe(new Subscriber<Object>() {
    //do stuff on the sync adapter thread

}

I tried using using

...
.observeOn(AndroidSchedulers.handlerThread(new Handler(Looper.myLooper())))
...

which is the same way android rx creates the scheduler for the main thread but doesn't work anymore as soon as I substitute Looper.myLooper() for Looper.getMainLooper().

I could use the Schedulers.newThread() but as its complex syncing code with a lot of server calls I would be constantly creating a new thread just to fire new network calls that again create new threads to to launch more network calls. Is there a way to do this? Or is my approach itself completly wrong?

like image 879
tiqz Avatar asked Nov 20 '15 08:11

tiqz


People also ask

What is observeOn in RxJava?

observeOn() observeOn controls which schedulers is used in downstream subscribers. Any code inside create is controlled by subscribeOn, now we might need to do a task in a background thread and update the UI thread after this. This is the most common use case in android development, observeOn comes to the rescue.

What is observeOn and subscribeOn?

The first subscribeOn changes the thread to IO thread and though the subsequent subscribeOn tries to change the thread to computation thread, it is rendered obsolete by the subscribeOn preceding it. On the contrary observeOn is able to change threads in subsequent calls as evident from the output of the logs.

Is RxJava single threaded?

2. Default Threading Behavior. By default, Rx is single-threaded which implies that an Observable and the chain of operators that we can apply to it will notify its observers on the same thread on which its subscribe() method is called.

What is a disposable in RxJava?

Disposables are streams/links/connections between Observer and Observable . They're meant to give power to the Observers to control Observables . In most cases, the Observer is a UI element, ie; Fragment , Activity , or a corresponding viewmodel of those elements.


1 Answers

Try Using Schedulers.immediate()

MyRetrofitApi.getInstance().getObjects()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe(new Subscriber<Object>() {
    //do stuff on the sync adapter thread

}

Its description says: Creates and returns a Scheduler that executes work immediately on the current thread.

NOTE:
I think it is okay to keep all the work on the SyncAdapter's thread because it is already using a different thread

like image 194
Firas Al Mannaa Avatar answered Sep 23 '22 15:09

Firas Al Mannaa