Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Invoke RxJava2 cancellable/disposable from correct thread

I am implementing an observable that emmits lines from a Resource.

The problem is that this resource really does not like being closed from a different thread that it was created on (it kills a puppy and throws an exception when this happens).

When I dispose the subscription, the resource Cancellable/Disposable is invoked from the main thread, while the observable was subscribed on Schedulers.io().

Here is the Kotlin code:

fun lines(): Observable<String> =
        Observable.create { emitter ->
            val resource = NetworkResource()
            emitter.setCancellable {
                resource.close() // <-- main thread :(
            }
            try {
                while (!emitter.isDisposed)
                    emitter.onNext(resource.readLine()) // <-- blocked here!
            } catch (ioe: IOException) {
                emitter.tryOnError(ioe) // <-- this also triggers the cancellable
            }
        }

val disposable = lines()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { Log.i(TAG, "Line: $it" }

disposable.dispose() // <-- main thread :)

Question: Is it possible to invoke the Cancellable from the correct* thread, taking into account that the subscribing thread is blocked on resource.readLine()?

*Correct thread meaning the one from subscribeOn(Schedures.io()).

EDIT: I am afraid this question does not have a correct answer, unless resource.close() is made thread-safe or some kind of polling on resource.dataReady is implemented so that the thread is not blocked.

like image 501
ESala Avatar asked Nov 08 '25 10:11

ESala


1 Answers

Schedulers.io() manages a thread pool, so it may or may not use the same thread to dispose your resource. You will have to use a custom scheduler and the unsubscribeOn() operator to ensure your Observable is subscribed and unsubscribed on the same thread. Something like:

Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor());

val disposable = lines()
        .unsubscribeOn(customScheduler)
        .subscribeOn(customScheduler)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { Log.i(TAG, "Line: $it" }
like image 89
Vinay Nagaraj Avatar answered Nov 10 '25 03:11

Vinay Nagaraj



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!