Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to tear down shared, infinite Observables with a delay after the last subscriber unsubscribed

We utilize mutliple services in our Android App. These services provide their data as infinite Observables, that are often constructed by combining the Observables of other services. The construction of these Observables can be costly. Furthermore the services are often consumed in multiple places so their Observable should be shared among the subscribers.

Example:

  • LocationService, provides an infinite Observable<Location>, that emits the current location
  • ReminderService, provides an infinite Observable<List<Reminder>>, that emits the list of all stored reminders after each change in the dataset
  • LocationAwareReminderService, provides an infinite Observable<List<Reminders>> of nearby reminders by Observable.combineLatest the Observables of the previous two services

First Approach: internal BehaviorSubjects as cache

Each services combines the consumed Observables and subscribes it's internal BehaviorSubject to the resulting feed. Consumers can then subscribe the this BehaviorSubject. The LocationAwareReminderService for example:

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create();
        Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).subscribe(cache);

        feed = cache.asObservable();
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}

Disadvantage:

  • because of the Behavior subject the feeds of the reminderService and the locatoinService are never teared down. Even if there is no consumer
  • This is especially problematic if they depend on Services, like the LocationService, that keeps publishing new items frequently
  • because of the subscribe(cache) in the Constructor the Service starts to calcuate nearby Reminders even if there is no subscriber present

Advantage:

  • the resulting feed is shared by all subscribers
  • because the feed is never teared down, short periods without a subscriber don't collapse the whole pipe

Second Approach: replay(1).refCount().

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).replay(1).refCount();
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}

Disadvantage:

  • short periods without a Subscriber collapse the whole pipe. During the next subscription the whole pipe needs to be reconstructed.
  • A transitions from Activity A to Activity B, both subscribing to the LocationAwareReminderService.getFeed(), leads to a complete de- and reconstruction of the pipe

Advantage:

  • after the last Subscriber unsubscribed, the LocationAwareReminderService will also unsubscribe from the LocationService.getFeed() and reminderService.getFeed() Observables.
  • The LocationAwareReminderService only starts to provide nearbyReminders after the first Subscriber subscribed
  • the resulting feed is shared by all Subscribers

Third Approach: make the refCount unsubscribe with a Timeout

Therefor I build a Transformer that keeps the subscription alive for a defined period after the last Subscriber unsubscribes

public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> {

    private long keepAlive;
    private TimeUnit timeUnit;

    public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) {
        this.keepAlive = keepAlive;
        this.timeUnit = timeUnit;
    }

    @Override
    public Observable<T> call(Observable<T> upstream) {

        final Observable<T> sharedUpstream = upstream.replay(1).refCount();

        return Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                if (subscriber.isUnsubscribed())
                    return;
                // subscribe an empty Subscriber that keeps the subsription of refCount() alive
                final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>());
                // listen to unsubscribe from the subscriber
                subscriber.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        // the subscriber unsubscribed
                        Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() {
                            @Override
                            public void call(Long _) {
                                // unsubscribe the keep alive subscription
                                keepAliveSubscription.unsubscribe();
                            }
                        });
                    }
                }));
                sharedUpstream.subscribe(subscriber);
            }
        });
    }

    public class NopSubscriber<T> extends Subscriber<T> {
        @Override
        public void onCompleted() {}
        @Override
        public void onError(Throwable e) {}
        @Override
        public void onNext(T o) {}
    }
}

The LocationAwareReminderService utilizing the RxPublishTimeoutCache

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS));
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}

Advantage:

  • The LocationAwareReminderService only starts to provide nearbyReminders after the first Subscriber subscribed
  • the resulting feed is shared by all subscribers
  • short periods without a subscriber don't collapse the whole pipe
  • the whole pipe will be teared down after there was no subscription for the defined period of time

Disadvantage:

  • Maybe some general flaw?

Questions:

  • Is there already some other way to achieve this in RxJava?
  • Is there some general design flaw in the RxPublishTimeoutCache ?
  • Is the overall strategy to compose such services with RxJava flawed?
like image 548
Andreas Wenger Avatar asked Apr 13 '16 11:04

Andreas Wenger


2 Answers

I thought this was an interesting problem and seemed a useful operator to have so I made Transformers.delayFinalUnsubscribe in rxjava-extras:

observable
  .publish()
  .refCount()
  .compose(Transformers
      .delayFinalUnsubscribe(1, TimeUnit.MINUTES));

It's available in rxjava-extras from 0.7.9.1 on Maven Central. Give it a spin if you like and see if there are any issues.

like image 132
Dave Moten Avatar answered Dec 08 '22 13:12

Dave Moten


Now there is a overload of refCount that takes a timeout, which does precisely that

like image 25
urSus Avatar answered Dec 08 '22 15:12

urSus