Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining Firebase realtime data listener with RxJava

I am using Firebase in my app, along with RxJava. Firebase is capable of notify your app whenever something changed in the backend data (addition, removals, changes, ...). I am trying to combine the feature of Firebase with RxJava.

The data I am listening for is called Leisure, and the Observable emits LeisureUpdate which contains a Leisure and the type of update (add, remove, moved, changed).

Here is my method which allows to subscribe to this events.

private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;

@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {

            @Override
            public void call(final Subscriber<? super LeisureUpdate> subscriber) {
                leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
                    @Override
                    public void onChildAdded(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
                    }

                    @Override
                    public void onChildChanged(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
                    }

                    @Override
                    public void onChildRemoved(DataSnapshot dataSnapshot) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
                    }

                    @Override
                    public void onChildMoved(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
                    }

                    @Override
                    public void onCancelled(FirebaseError firebaseError) {
                        subscriber.onError(new Error(firebaseError.getMessage()));
                    }
                });
            }
        });
    }
    leisureUpdatesSubscriptionsCount++;
    return leisureUpdatesObservable;
}

First off, I would like to use Observable.fromCallable() method in order to create the Observable, but I guess it is impossible, since Firebase uses callbacks, right?

I keep a single instance of the Observable in order to always have one Observable where multiple Subscriber can subscribe.

The problem comes when everyone unsubscribe and I need to stop listening for the events in Firebase. I didn't find anyway to make the Observable understand if there is any subscription still. So I keep counting how many calls I got to subscribeToLeisuresUpdates(), with leisureUpdatesSubscriptionsCount.

Then every time someone wants to unsubscribe it has to call

@Override
public void unsubscribeFromLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        return;
    }
    leisureUpdatesSubscriptionsCount--;
    if (leisureUpdatesSubscriptionsCount == 0) {
        firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
        leisureUpdatesObservable = null;
    }
}

This is the only way I found to make the Observable emits items when there is a subscriber, but I feel like there must be an easier way, specially understanding when there is no more subscribers listening to the observable.

Anyone who encountered a similar problem or have a different approach?

like image 708
Daniele Vitali Avatar asked Mar 31 '16 10:03

Daniele Vitali


3 Answers

You can use Observable.fromEmitter, something along these lines

    return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
        @Override
        public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
            final ValueEventListener listener = new ValueEventListener() {
                @Override
                public void onDataChange(DataSnapshot dataSnapshot) {
                    // process update
                    LeisureUpdate leisureUpdate = ...
                    leisureUpdateEmitter.onNext(leisureUpdate);
                }

                @Override
                public void onCancelled(DatabaseError databaseError) {
                    leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
                    mDatabaseReference.removeEventListener(this);
                }
            };
            mDatabaseReference.addValueEventListener(listener);
            leisureUpdateEmitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    mDatabaseReference.removeEventListener(listener);
                }
            });
        }
    }, Emitter.BackpressureMode.BUFFER);
like image 145
Francesc Avatar answered Oct 24 '22 12:10

Francesc


Put this in your Observable.create() at the end.

subscriber.add(Subscriptions.create(new Action0() {
                    @Override public void call() {
                        ref.removeEventListener(leisureUpdatesListener);
                    }
                }));
like image 3
Vinay Nagaraj Avatar answered Oct 24 '22 12:10

Vinay Nagaraj


I suggest you to check as reference(or just use it) one of the next libraries:

RxJava : https://github.com/nmoskalenko/RxFirebase

RxJava 2.0: https://github.com/FrangSierra/Rx2Firebase

One of them works with RxJava and the other one with the new RC of RxJava 2.0. If you are interested of it, you can see the differences between both here.

like image 3
Francisco Durdin Garcia Avatar answered Oct 24 '22 12:10

Francisco Durdin Garcia