We utilize mutliple services in our Android App. These services provide their data as infinite Observables
, that are often constructed by combining the Observables
of other services.
The construction of these Observables
can be costly. Furthermore the services are often consumed in multiple places so their Observable
should be shared among the subscribers.
LocationService
, provides an infinite Observable<Location>
, that emits the current locationReminderService
, provides an infinite Observable<List<Reminder>>
, that emits the list of all stored reminders after each change in the datasetLocationAwareReminderService
, provides an infinite Observable<List<Reminders>>
of nearby reminders by Observable.combineLatest
the Observables
of the previous two servicesEach services combines the consumed Observables
and subscribes it's internal BehaviorSubject
to the resulting feed. Consumers can then subscribe the this BehaviorSubject
.
The LocationAwareReminderService
for example:
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create();
Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).subscribe(cache);
feed = cache.asObservable();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Disadvantage:
Advantage:
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).replay(1).refCount();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Disadvantage:
Subscriber
collapse the whole pipe. During the next subscription the whole pipe needs to be reconstructed.Activity
A to Activity
B, both subscribing to the LocationAwareReminderService.getFeed()
, leads to a complete de- and reconstruction of the pipeAdvantage:
Subscriber
unsubscribed, the LocationAwareReminderService
will also unsubscribe from the LocationService.getFeed()
and reminderService.getFeed()
Observables
.LocationAwareReminderService
only starts to provide nearbyReminders after the first Subscriber
subscribedSubscriber
sTherefor I build a Transformer
that keeps the subscription alive for a defined period after the last Subscriber
unsubscribes
public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> {
private long keepAlive;
private TimeUnit timeUnit;
public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) {
this.keepAlive = keepAlive;
this.timeUnit = timeUnit;
}
@Override
public Observable<T> call(Observable<T> upstream) {
final Observable<T> sharedUpstream = upstream.replay(1).refCount();
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
if (subscriber.isUnsubscribed())
return;
// subscribe an empty Subscriber that keeps the subsription of refCount() alive
final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>());
// listen to unsubscribe from the subscriber
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// the subscriber unsubscribed
Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() {
@Override
public void call(Long _) {
// unsubscribe the keep alive subscription
keepAliveSubscription.unsubscribe();
}
});
}
}));
sharedUpstream.subscribe(subscriber);
}
});
}
public class NopSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(T o) {}
}
}
The LocationAwareReminderService
utilizing the RxPublishTimeoutCache
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS));
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Advantage:
LocationAwareReminderService
only starts to provide nearbyReminders after the first Subscriber
subscribedDisadvantage:
RxPublishTimeoutCache
?I thought this was an interesting problem and seemed a useful operator to have so I made Transformers.delayFinalUnsubscribe
in rxjava-extras:
observable
.publish()
.refCount()
.compose(Transformers
.delayFinalUnsubscribe(1, TimeUnit.MINUTES));
It's available in rxjava-extras from 0.7.9.1 on Maven Central. Give it a spin if you like and see if there are any issues.
Now there is a overload of refCount that takes a timeout, which does precisely that
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With