Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unsubscribing from RxJava2/RxAndroid PublishSubject

I'm trying to replace EventBus with RxAndroid.

I want pageable fragments to subscribe/unsubscribe to an event source, these fragments get created and discarded relatively quickly, depending on how fast the user slides to a new page.

In EventBus I was able to add an decorated callback method (ie @Subscribe(threadMode = ThreadMode.MAIN)) and register/unregister in the onStart/onStop methods of the fragment.

With RxJava2 I now create a PublishSubject object in a class

public static PublishSubject<List<Long>> m_psUpdatedDays = PublishSubject.create();
public static void publishUpdatedDays(List<Long> lDay) {
    m_psUpdatedDays.onNext(lDay);
}

and subscribe to this publisher in another class by calling the following in the Fragment's onStart method:

m_psUpdatedDays.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<List<Long>>() {
    @Override public void onSubscribe(Disposable d) {}
    @Override public void onNext(List<Long> longs) {
      ...
      Update Fragment UI here
      ...
    }
    @Override public void onError(Throwable e) {}
    @Override public void onComplete() {}
});

My question is how can I unsubscribe this new Observer when the Fragment's onStop method is called by the system?

Do I need to store the Disposable object which I get in the onSubscribe and then call .dispose() on it in the onStop method?

like image 561
Daniel F Avatar asked Jun 13 '18 18:06

Daniel F


1 Answers

You can make use of a CompositeDisposable object, which can keep a list of disposables and all of them can be disposed together.

Create a CompositeDisposable instance in the base fragment level, keep on adding your disposables into it.

    public abstract class BaseFragment extends Fragment {
        protected CompositeDisposable mCompositeDisposable = new CompositeDisposable();

        @Override
        public void onPause() {
            super.onPause();
            mCompositeDisposable.clear();
            //clear will clear all, but can accept new disposable.
            // You can call it on `onPause` or `orDestroyView` events.
        }

        @Override
        public void onDestroy() {
            super.onDestroy();
            mCompositeDisposable.dispose();
            //dispose will clear all and set isDisposed = true, so it will not accept any new disposable
        }
    }

In your fragments, subscribe to the Observable using the subscribeWith method, which gives you a disposable instantly and this disposable you can dispose later in the onPause or onDestroy events (wherever you want)

     public class MyFragment extends BaseFragment {


            @Override
            public void onStart() {
                super.onStart();
                Disposable disposable = m_psUpdatedDays.observeOn(AndroidSchedulers.mainThread())
                        .subscribeWith(new DisposableObserver<List<Long>>() { // Use `subscribeWith` instead of `subscribe`, which will give you back the disposable , which can be disposed later
                            @Override
                            public void onNext(List<Long> longs) {
                                // Update Fragment UI here
                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onComplete() {

                            }
                        });
                mCompositeDisposable.add(disposable); // add the disposable to the disposable list
            }
        }
like image 64
Sarath Kn Avatar answered Sep 24 '22 10:09

Sarath Kn