Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cancelling an Observable in RxJava

Tags:

rx-java

I have an observable that is performing a download. However I want to cancel that observable when I click on a button.

How is that possible?

Thank you.

like image 907
Fábio Carballo Avatar asked Oct 14 '15 13:10

Fábio Carballo


3 Answers

You can't really tell an observable to cancel a network request as RxJava has no knowledge of the code thats running inside of the Observable. What you can do is use the doOnUnsubscribe to check whether the network request is still running and if it is cancel it, then just unsubscribe when you want to cancel.

like image 191
FriendlyMikhail Avatar answered Oct 23 '22 16:10

FriendlyMikhail


This should give you the general idea. The summary is to use Observable.using() and subscribe to the Observable with a Subscriber so you can call subscriber.unsubscribe() when the user clicks the button to cancel.

Here's an outline.

Subscriber<T> subscriber = ...;
Observable
    // create a stream from a socket and dispose of socket
    // appropriately
    .using(socketCreator(host, port, quietTimeoutMs),
           socketObservableFactory(charset), 
           socketDisposer(), true)
    // cannot ask host to slow down so buffer on
    // backpressure
   .onBackpressureBuffer()
   .subscriber(subscriber);

On clicking a button call:

subscriber.unsubscribe()

This will stop the observable stream and call socketDisposer() to stop the network activity.

You haven't specified what is the nature of your download (ftp, http, etc) but the answer doesn't really change because all of those transports will fit this pattern.

like image 41
Dave Moten Avatar answered Oct 23 '22 18:10

Dave Moten


In my case I had a Publish subject that was emitting items in a certain condition. In some condition I wanted to cancel it, So I added a sealed class that holds the state:

sealed class DelayedItem {
   object Ignore : DelayedItem()
   data class Handle(val text: String) : DelayedItem()
}

And my subscriber was bounced like that:

myPublishSubject
            .subscribeOn(Schedulers.io())
            .debounce(500, TimeUnit.MILLISECONDS)
            .subscribe {
                if (it is DelayedItem.Handle)
                    handleItem(it)
            }

Hope it helps!

like image 1
user3193413 Avatar answered Oct 23 '22 16:10

user3193413