Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: observeOn, subscribeOn, and doFinally, switching between IO and UI thread

I am running into an issue where my observable is subscribed on an IO thread and observed on the android main (UI) thread but the doFinally operator is run on the IO thread and it needs to be run on the UI thread.

The usecase is almost exactly the same as this medium article.

I essentially want to show a ProgressBar when the Observable is subscribed to and hide the ProgressBar when the Observable is terminated or finished.

The error I am getting is: java.lang.IllegalStateException: The current thread must have a looper!

Can anyone help me move the doFinally action back to the UI thread which has a looper? Or am I missing some other piece of information?

EDIT The usecase workflow is:

-> Launch Activity

-> initialize

-> execute observable stream

-> Start new Activity and finish current activity

-> New activity

-> Start original activity and finish

-> repeat initialize

Thank you very much.

Details:

  • RxJava 2.0.7
  • RxAndroid 2.0.1
  • Android sdk min 14 and target 25

Example Code

listUseCase.execute(null)
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true);
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.main())
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );

Stack Trace:

FATAL EXCEPTION: RxCachedThreadScheduler-1
  Process: com.example.android.demo.customerfirst.alpha, PID: 16685
  java.lang.IllegalStateException: The current thread must have a looper!
      at android.view.Choreographer$1.initialValue(Choreographer.java:96)
      at android.view.Choreographer$1.initialValue(Choreographer.java:91)
      at java.lang.ThreadLocal$Values.getAfterMiss(ThreadLocal.java:430)
      at java.lang.ThreadLocal.get(ThreadLocal.java:65)
      at android.view.Choreographer.getInstance(Choreographer.java:192)
      at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:600)
      at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:575)
      at android.animation.ValueAnimator.getOrCreateAnimationHandler(ValueAnimator.java:1366)
      at android.animation.ValueAnimator.end(ValueAnimator.java:998)
      at android.graphics.drawable.AnimatedVectorDrawable.stop(AnimatedVectorDrawable.java:439)
      at android.widget.ProgressBar.stopAnimation(ProgressBar.java:1523)
      at android.widget.ProgressBar.onVisibilityChanged(ProgressBar.java:1583)
      at android.view.View.dispatchVisibilityChanged(View.java:8643)
      at android.view.View.setFlags(View.java:9686)
      at android.view.View.setVisibility(View.java:6663)
      at android.widget.ProgressBar.setVisibility(ProgressBar.java:1563)
      at com.example.android.demo.customerfirst.featuresstore.list.ProductListActivity.showLoading(ProductListActivity.java:121)
      at com.example.android.demo.customerfirst.featuresstore.list.ProductListPresenterMediator$3.run(ProductListPresenterMediator.java:56)
      at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.runFinally(ObservableDoFinally.java:144)
      at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.onComplete(ObservableDoFinally.java:94)
      at io.reactivex.internal.observers.DisposableLambdaObserver.onComplete(DisposableLambdaObserver.java:73)
      at io.reactivex.internal.observers.DeferredScalarDisposable.complete(DeferredScalarDisposable.java:84)
      at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:52)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
      at io.reactivex.Scheduler$1.run(Scheduler.java:138)
      at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
      at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
      at java.util.concurrent.FutureTask.run(FutureTask.java:237)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
      at java.lang.Thread.run(Thread.java:818)
like image 975
JTT Avatar asked Dec 23 '22 18:12

JTT


1 Answers

The issue was occurring because I was not disposing the subscription when an activity was finished/destroyed.

Now each activity/view tells the presenter when they are stopped or destroyed and the presenter disposes of the subscription.

This appears to have solved my issue.

 @Override
public void initialize() {
    if (!isViewAttached()) {
        throw new ViewNotAttachedException();
    }
    disposable = listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );
}

@Override
public void dispose() {
    if (disposable != null) {
        disposable.dispose();
    }
}
like image 140
JTT Avatar answered Dec 29 '22 01:12

JTT