Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: How to interrupt thread on unsubscribe?

I use Observable.create() to create an observable to perform some work on a scheduler (e.g. Schedulers.io() and then return a result on AndroidSchedulers.mainThread().

val subscription = observable<T> {
        try {
            // perform action synchronously
            it.onNext(action.invoke(context, args))
            it.onCompleted()
        } catch (t: Exception) {
            it.onError(t)
        }
    }.subscribeOn(scheduler)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    {
                        // handle result here
                        result.set(it)
                    },
                    {
                        // handle error here
                        errorHandler.handleTaskError(model, this, it)
                    },
                    {
                        // notify completed
                        model.completeTask(this)
                    }
            )

The operation inside action.invoke() is synchronous and might be a blocking IO operation. When user decides to cancel it, I unsubscribe from the observable: subscription.unsubscribe()

However, the I/O operation is not being interrupted. Is there any rx-java API to interrupt the operation?

like image 402
ntoskrnl Avatar asked May 12 '15 12:05

ntoskrnl


2 Answers

When you call yourSubscription.unsubscribe();, Rx will call your unsubscribe code.

This unsuscribe code will be the Subscription class that you can add to your subscriber when you create your Observable.

Observable<Object> obs = Observable.create(subscriber -> {

      subscriber.add(new Subscription() {
            @Override
            public void unsubscribe() {
                 // perform unsubscription
            }

            @Override
            public boolean isUnsubscribed() {
                return false;
            }
        });
      subscriber.onNext(/**...*/);
      subscriber.onCompleted();
}

So in the unsubscribe method, you can interrupt your job, if you have a way to do so.

Please note that the unsubscribe method is called when you unsubscribe from an Observable or when An Observable when it's completed. (it will unsubscribe by itself)

edit : taking vladimir mironov comment in account

like image 142
dwursteisen Avatar answered Oct 15 '22 07:10

dwursteisen


RxJava schedulers use Furure.cancel(true) to interrupt a running action so if you have an interrupt sensitive operation (such as Thread.sleep()), you should see the thread interruption, although you can't do much about it because at that point, the entire downstream chain has been unsubscribed and no further events are can reach your subscriber.

If your action is not sensitive to interruption and it doesn't provide any standard means to interrupt it (such as calling close() on a socket, or cancel() on a JDBC statement), RxJava can't do much about it.

If you can call some close() method asynchronously, consider using using() which let's you specify a cleanup callback that can close your resource on unsubscription.

like image 28
akarnokd Avatar answered Oct 15 '22 07:10

akarnokd