I am using Firebase in my app, along with RxJava. Firebase is capable of notify your app whenever something changed in the backend data (addition, removals, changes, ...). I am trying to combine the feature of Firebase with RxJava.
The data I am listening for is called Leisure
, and the Observable
emits LeisureUpdate
which contains a Leisure
and the type of update (add, remove, moved, changed).
Here is my method which allows to subscribe to this events.
private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;
@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {
@Override
public void call(final Subscriber<? super LeisureUpdate> subscriber) {
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
}
@Override
public void onCancelled(FirebaseError firebaseError) {
subscriber.onError(new Error(firebaseError.getMessage()));
}
});
}
});
}
leisureUpdatesSubscriptionsCount++;
return leisureUpdatesObservable;
}
First off, I would like to use Observable.fromCallable()
method in order to create the Observable
, but I guess it is impossible, since Firebase uses callbacks, right?
I keep a single instance of the Observable
in order to always have one Observable
where multiple Subscriber
can subscribe.
The problem comes when everyone unsubscribe and I need to stop listening for the events in Firebase.
I didn't find anyway to make the Observable
understand if there is any subscription still. So I keep counting how many calls I got to subscribeToLeisuresUpdates()
, with leisureUpdatesSubscriptionsCount
.
Then every time someone wants to unsubscribe it has to call
@Override
public void unsubscribeFromLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
return;
}
leisureUpdatesSubscriptionsCount--;
if (leisureUpdatesSubscriptionsCount == 0) {
firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
leisureUpdatesObservable = null;
}
}
This is the only way I found to make the Observable
emits items when there is a subscriber, but I feel like there must be an easier way, specially understanding when there is no more subscribers listening to the observable.
Anyone who encountered a similar problem or have a different approach?
You can use Observable.fromEmitter, something along these lines
return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
@Override
public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
final ValueEventListener listener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
// process update
LeisureUpdate leisureUpdate = ...
leisureUpdateEmitter.onNext(leisureUpdate);
}
@Override
public void onCancelled(DatabaseError databaseError) {
leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
mDatabaseReference.removeEventListener(this);
}
};
mDatabaseReference.addValueEventListener(listener);
leisureUpdateEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
mDatabaseReference.removeEventListener(listener);
}
});
}
}, Emitter.BackpressureMode.BUFFER);
Put this in your Observable.create() at the end.
subscriber.add(Subscriptions.create(new Action0() {
@Override public void call() {
ref.removeEventListener(leisureUpdatesListener);
}
}));
I suggest you to check as reference(or just use it) one of the next libraries:
RxJava : https://github.com/nmoskalenko/RxFirebase
RxJava 2.0: https://github.com/FrangSierra/Rx2Firebase
One of them works with RxJava and the other one with the new RC of RxJava 2.0. If you are interested of it, you can see the differences between both here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With