Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to manage Looper thread needed in RxJava

I want to encapsulate in an Observable a logic that queries a ContentProvider and also subscribes ContentProvider cursor to supply continuous updates.

As the observable does IO work I need to subscribe it in Schedulers.io(). The problem with that is that then I can't register a ContentObserver because it needs a looper prepared thread.

What is the recommended way to manage that and encapsulate it in a single Observable.

Code to illustrate that:

public Observable<Integer> unreadCountObservable() {
    return Observable.create(subscriber -> {
        new UnreadCountObservable(subscriber);
    });
}

private class UnreadCountObservable {
    private Subscriber subscriber;

    public UnreadCountObservable(Subscriber subscriber) {
        this.subscriber = subscriber;
        Cursor cursor = queryUnread(subscriber);
        cursor.registerContentObserver(observer);
        subscriber.add(Subscriptions.create(() -> {
            cursor.unregisterContentObserver(observer);
            cursor.close();
        }));
    }

    @NonNull
    private Cursor queryUnread(Subscriber subscriber) {
        Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null);
        if(cursor.moveToNext()) {
            Integer count = cursor.getInt(0);
            subscriber.onNext(count);
        } else {
            subscriber.onNext(0);
        }
        return cursor;
    }

    private ContentObserver observer = new ContentObserver(new Handler()) {
        @Override
        public boolean deliverSelfNotifications() {
            return false;
        }

        @Override
        public void onChange(boolean selfChange) {
            Timber.d("New sms data changed");
            queryUnread(subscriber);
        }
    };
}

Note 1 The problem with the above code is that it can't be called with .subscribeOn(Schedulers.io() due to the registerObserver, and if it's called it mainThread then the queries also run on them)

Note: Encapsulating all in a single Observable is a key requirement and the motive of this question

My best idea now is to create a HandlerThread for activity where I use the Observable and use the looper from that thread. But want to know if there are better alternatives, and if making a generic scheduler (e.g looperIoScheduler()) makes sense on can cause problems.

like image 801
lujop Avatar asked May 31 '26 19:05

lujop


1 Answers

In Observable chain you can change the thread as often as you wish. Have a look here.

The function rx.Observable#observeOn(rx.Scheduler) can be in any place inside the chain. Try to do something like this (pseudocode):

Observable.just(cursor)
        .observeOn(AndroidSchedulers.mainThread())
        .map((Cursor) -> {
                cursor.registerContentObserver(observer);
                return cursor;
            }
        }).observeOn(Schedulers.io());
subscriber.add(Subscriptions.create(() -> {
        cursor.unregisterContentObserver(observer);
        cursor.close();
    }));
like image 57
R. Zagórski Avatar answered Jun 03 '26 09:06

R. Zagórski