Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do 'Intermediate IObservables' without final subscribers get kept in memory for the lifetime of the root IObservable

For example, consider this:

    public IDisposable Subscribe<T>(IObserver<T> observer)
    {
        return eventStream.Where(e => e is T).Cast<T>().Subscribe(observer);
    }

The eventStream is a long lived source of events. A short lived client will use this method to subscribe for some period of time, and then unsubscribe by calling Dispose on the returned IDisposable.

However, while the eventStream still exists and should be kept in memory, there has been 2 new IObservables created by this method - the one returned by the Where() method that is presumably held in memory by the eventStream, and the one returned by the Cast<T>() method that is presumably held in memory by the one returned by the Where() method.

How will these 'intermediate IObservables' (is there a better name for them?) get cleaned up? Or will they now exist for the lifetime of the eventStream even though they no longer have subscriptions and no one else references them except for their source IObservable and therefor will never have subscriptions again?

If they are cleaned up by informing their parent they no longer have subscriptions, how do they know nothing else has taken a reference to them and may at some point later subscribe to them?

like image 509
Tyson Avatar asked Oct 09 '22 09:10

Tyson


2 Answers

However, while the eventStream still exists and should be kept in memory, there has been 2 new IObservables created by this method - the one returned by the Where() method that is presumably held in memory by the eventStream, and the one returned by the Cast() method that is presumably held in memory by the one returned by the Where() method.

You have this backward. Let's walk through the chain of what is going on.

IObservable<T> eventStream; //you have this defined and assigned somewhere

public IDisposable Subscribe<T>(IObserver<T> observer)
{
    //let's break this method into multiple lines

    IObservable<T> whereObs = eventStream.Where(e => e is T);
    //whereObs now has a reference to eventStream (and thus will keep it alive), 
    //but eventStream knows nothing of whereObs (thus whereObs will not be kept alive by eventStream)
    IObservable<T> castObs = whereObs.Cast<T>();
    //as with whereObs, castObs has a reference to whereObs,
    //but no one has a reference to castObs
    IDisposable ret = castObs.Subscribe(observer);
    //here is where it gets tricky.
    return ret;
}

What ret does or does not have a reference to depends on the implementation of the various observables. From what I have seen in Reflector in the Rx library and the operators I have written myself, most operators do not return disposables that have a reference to the operator observable itself.

For example, a basic implementation of Where would be something like (typed directly in the editor, no error handling)

IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> filter)
{
    return Observable.Create<T>(obs =>
      {
         return source.Subscribe(v => if (filter(v)) obs.OnNext(v),
                                 obs.OnError, obs.OnCompleted);
      }
}

Notice that the disposable returned will have a reference to the filter function via the observer that is created, but will not have a reference to the Where observable. Cast can be easily implemented using the same pattern. In essence, the operators become observer wrapper factories.

The implication of all this to the question at hand is that the intermediate IObservables are eligible for garbage collection by the end of the method. The filter function passed to Where stays around as long as the subscription does, but once the subscription is disposed or completed, only eventStream remains (assuming it is still alive).

EDIT for supercat's comment, let's look at how the compiler might rewrite this or how you would implement this without closures.

class WhereObserver<T> : IObserver<T>
{
    WhereObserver<T>(IObserver<T> base, Func<T, bool> filter)
    {
        _base = base;
        _filter = filter;
    }

    IObserver<T> _base;
    Func<T, bool> _filter;

    void OnNext(T value)
    {
        if (filter(value)) _base.OnNext(value);
    }

    void OnError(Exception ex) { _base.OnError(ex); }
    void OnCompleted() { _base.OnCompleted(); }
}

class WhereObservable<T> : IObservable<T>
{
    WhereObservable<T>(IObservable<T> source, Func<T, bool> filter)
    {
        _source = source;
        _filter = filter;
    }

    IObservable<T> source;
    Func<T, bool> filter;

    IDisposable Subscribe(IObserver<T> observer)
    {
        return source.Subscribe(new WhereObserver<T>(observer, filter));
    }
}

static IObservable<T> Where(this IObservable<T> source, Func<T, bool> filter)
{
    return new WhereObservable(source, filter);
}

You can see that the observer does not need any reference to the observable that generated it and the observable has no need to track the observers it creates. We didn't even make any new IDisposable to return from our subscribe.

In reality, Rx has some actual classes for anonymous observable/observer that take delegates and forward the interface calls to those delegates. It uses closures to create those delegates. The compiler does not need to emit classes that actually implement the interfaces, but the spirit of the translation remains the same.

like image 189
Gideon Engelberth Avatar answered Oct 10 '22 23:10

Gideon Engelberth


I think I've come to the conclusion with the help of Gideon's answer and breaking down a sample Where method:

I assumed incorrectly that each downstream IObservable was referenced by the upstream at all times (in order to push events down when needed). But this would root downstreams in memory for the lifetime of the upstream.

In fact, each upstream IObservable is referenced by the downstream IObservable (waiting, ready to hook an IObserver when required). This roots upstreams in memory as long as the downstream is referenced (which makes sense, as while a downstream in still referenced somewhere, a subscription may occur at any time).

However when a subscription does occur, this upstream to downstream reference chain does get formed, but only on the IDisposable implementation objects that manage the subscriptions at each observable stage, and only for the lifetime of that subscription. (which also makes sense - while a subscription exists, each upstream 'processing logic' must still be held in memory to handle the events being passed through to reach the final subscriber IObserver).

This gives a solution to both problems - while an IObservable is referenced, it will hold all source (upstream) IObservables in memory, ready for a subscription. And while a subscription exists, it will hold all downstream subscriptions in memory, allowing the final subscription to still receive events even though it's source IObservable may no longer be referenced.

Applying this to my example in my question, the Where and Cast downstream observables are very short lived - referenced up until the Subscribe(observer) call completes. They are then free to be collected. The fact that the intermediate observables may now be collected does not cause a problem for the subscription just created, as it has formed it's own subscription object chain (upstream -> downstream) that is rooted by the source eventStream observable. This chain will be released as soon as each downstream stage disposes its IDisposable subscription tracker.

like image 42
Tyson Avatar answered Oct 10 '22 23:10

Tyson