I use Observable.create()
to create an observable to perform some work on a scheduler (e.g. Schedulers.io()
and then return a result on AndroidSchedulers.mainThread()
.
val subscription = observable<T> {
try {
// perform action synchronously
it.onNext(action.invoke(context, args))
it.onCompleted()
} catch (t: Exception) {
it.onError(t)
}
}.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
// handle result here
result.set(it)
},
{
// handle error here
errorHandler.handleTaskError(model, this, it)
},
{
// notify completed
model.completeTask(this)
}
)
The operation inside action.invoke()
is synchronous and might be a blocking IO operation. When user decides to cancel it, I unsubscribe from the observable: subscription.unsubscribe()
However, the I/O operation is not being interrupted. Is there any rx-java API to interrupt the operation?
When you call yourSubscription.unsubscribe();
, Rx will call your unsubscribe code.
This unsuscribe code will be the Subscription
class that you can add
to your subscriber
when you create your Observable
.
Observable<Object> obs = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// perform unsubscription
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscriber.onNext(/**...*/);
subscriber.onCompleted();
}
So in the unsubscribe
method, you can interrupt your job, if you have a way to do so.
Please note that the unsubscribe method is called when you unsubscribe from an Observable or when An Observable when it's completed. (it will unsubscribe by itself)
edit : taking vladimir mironov comment in account
RxJava schedulers use Furure.cancel(true) to interrupt a running action so if you have an interrupt sensitive operation (such as Thread.sleep()
), you should see the thread interruption, although you can't do much about it because at that point, the entire downstream chain has been unsubscribed and no further events are can reach your subscriber.
If your action is not sensitive to interruption and it doesn't provide any standard means to interrupt it (such as calling close()
on a socket, or cancel()
on a JDBC statement), RxJava can't do much about it.
If you can call some close() method asynchronously, consider using using()
which let's you specify a cleanup callback that can close your resource on unsubscription.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With