Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava- When should I be concerned with unsubscribing?

I want to make sure that I am not creating any memory leaks when I use RxJava. Please let me know if these are the proper ways to handle each case...

CASE 1

if I create an Observable and a Subscription in the same scope, the GC would take care of disposing them right? Do I have to call unsubscribe() here?

public static void createObservableAndSubscribe() { 
        
        Observable<Integer> source = Observable.just(1,6,3,2,562,4,6);
        
        Subscription sub = source.subscribe(i -> System.out.println(i));
        
        sub.unsubscribe(); //unnecessary I assume? GC should take care of it?
}

CASE 2

But what about an infinite observable and a temporary subscriber? Say I have a GUI that subscribes to an infinite Observable. But when the GUI gets destroyed, do I have to explicitly unsubscribe the Subscriber?

public void attachSubscriber(Observable<Integer> infiniteObservable) { 
    Subscription sub = infiniteObservable.subscribe(i -> countLabel.setText(i.toString());
    
    //do stuff, have this subscription persist for the life of a GUI
    sub.unsubscribe(); //must I do this when the GUI is destroyed?
}
like image 270
tmn Avatar asked Jun 24 '15 19:06

tmn


1 Answers

Case 1:

The source is a synchronous finite Observable so the stream would complete and unsubscribe itself (SafeSubscriber is inserted into the call chain by the subscribe method and will call unsubscribe on termination (error or completion)). Note that being synchronous sub won't be assigned till the stream has finished!

Case 2: I recommend you do ensure unsubscribe gets called in this case to stop the activity and release its resources. Note that a more flexible pattern that will work with both synchronous and asynchronous sources is to set up the Subscriber beforehand and not rely on a return from the subscribe method:

Subscriber<Integer> subscriber = Subscribers.from(i-> {countLabel.setText(i.toString);});
infiniteObservable.subscribe(subscriber);

Now if you call subscriber.unsubscribe() from a thread other than the thread on which the observable is subscribed on then it will stop even if the observable was a synchronous source.

like image 119
Dave Moten Avatar answered Nov 10 '22 00:11

Dave Moten