I am implementing an observable that emmits lines from a Resource.
The problem is that this resource really does not like being closed from a different thread that it was created on (it kills a puppy and throws an exception when this happens).
When I dispose the subscription, the resource Cancellable/Disposable is invoked from the main thread, while the observable was subscribed on Schedulers.io().
Here is the Kotlin code:
fun lines(): Observable<String> =
Observable.create { emitter ->
val resource = NetworkResource()
emitter.setCancellable {
resource.close() // <-- main thread :(
}
try {
while (!emitter.isDisposed)
emitter.onNext(resource.readLine()) // <-- blocked here!
} catch (ioe: IOException) {
emitter.tryOnError(ioe) // <-- this also triggers the cancellable
}
}
val disposable = lines()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Line: $it" }
disposable.dispose() // <-- main thread :)
Question: Is it possible to invoke the Cancellable from the correct* thread, taking into account that the subscribing thread is blocked on resource.readLine()?
*Correct thread meaning the one from subscribeOn(Schedures.io()).
EDIT: I am afraid this question does not have a correct answer, unless resource.close() is made thread-safe or some kind of polling on resource.dataReady is implemented so that the thread is not blocked.
Schedulers.io() manages a thread pool, so it may or may not use the same thread to dispose your resource. You will have to use a custom scheduler and the unsubscribeOn() operator to ensure your Observable is subscribed and unsubscribed on the same thread. Something like:
Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
val disposable = lines()
.unsubscribeOn(customScheduler)
.subscribeOn(customScheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Line: $it" }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With