Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RX2.0: ObjectDisposedException after diposing EventLoopScheduler

We have recently ported the system from RX 1.11111 to RX 2.0 and discovered this problem. We use an EventLoopScheduler for ObserveOn like this:

IDisposable subscription = someSubject
    .ObserveOn(m_eventLoopScheduler)
    .SomeMoreRXFunctions()
    .Subscribe((something)=>something)

The scheduler is disposed on application exit (m_eventLoopScheduler.Dispose). Before that we dispose of all the subscriptions to the observable (subscription.Dispose).

Despite that, we are getting an ObjectDisposedException inside the EventLoopScheduler.Schedule. It's impossible to catch that exception because it originates in an RX thread. It's almost like the Dispose doesn't get rid of all the items in some queue.

We tried to remove the call to EventLoopScheduler.Dispose and the exception disappeared. But then the code in SomeMoreRXFunctions() was executed for about 10 more times although all the subscriptions were disposed.

Is there some other way to properly close the EventLoopScheduler?

like image 250
Lena Avatar asked Oct 28 '12 13:10

Lena


2 Answers

Some observations about subscriptions

(Sorry, couldn't resist the pun!) IObservable<out T>, the interface implemented by almost every Rx operator, has just one vital method:

IDisposable Subscribe(IObserver<T> observer);

It is purely through this method and the disposal of it's return value that an observer (implementing IObserver<T>) can determine when a subscription starts and ends.

When a subscription is made to an Observable that is part of a chain, generally (either directly or indirectly), this will result in a subscription further up the chain. Precisely if and when this happens is down to that given Observable.

In many cases, the relationship between subscriptions received to subscriptions made is not one-to-one. An example of this is Publish(), which will only have at most one subscription to its source, regardless of the number of subscriptions it receives. This is really the whole point of Publish.

In other cases, the relationship has a temporal aspect. For example, Concat() won't subscribe to its second stream until the first has OnCompleted() - which could be never!

It's worth taking a moment here to examine the Rx Design Guidelines, as they have some very relevant things to say:

Rx Design Guidelines

4.4. Assume a best effort to stop all outstanding work on Unsubscribe. When unsubscribe is called on an observable subscription, the observable sequence will make a best effort attempt to stop all outstanding work. This means that any queued work that has not been started will not start.

Any work that is already in progress might still complete as it is not always safe to abort work that is in progress. Results from this work will not be signalled to any previously subscribed observer instances.

The bottom line

Note the implications here; the bottom line is that it's entirely down to the implementation of an Observable when any upstream subscriptions might be made or disposed. In other words, there is absolutely no guarantee that disposing of subscriptions will cause an Observable to dispose any or all of the subscriptions it has either made directly or indirectly. And that goes for any other resources (such as scheduled actions) used by the operator or it's upstream subscriptions.

The best you can hope for is that the author of every upstream operator has indeed made a best effort to stop all outstanding work.

Back to the question (at last!)

Without seeing the content of SomeMoreRXFunctions I can't be certain, but it seems highly likely that the exception you are seeing is being caused because - in spite of disposing the subscriptions you know about - by disposing the scheduler you have ripped the rug from under the feet of still running subscriptions. Effectively, you are causing this:

void Main()
{
    var scheduler = new EventLoopScheduler();

    // Decide it's time to stop
    scheduler.Dispose();

    // The next line will throw an ObjectDisposedException
    scheduler.Schedule(() => {});
}

It's easy to write a perfectly reasonable operator that can cause this problem - even one that doesn't directly use a scheduler! Consider this:

public static class ObservableExtensions
{
    public static IObservable<TSource> ReasonableDelay<TSource, TDelay>
        (this IObservable<TSource> source, IObservable<TDelay> delay)
    {
        return Observable.Create<TSource>(observer =>
        {        
            var subscription = new SerialDisposable();
            subscription.Disposable = delay
                .IgnoreElements()
                .Subscribe(_ => {}, () => {
                    Console.WriteLine("Waiting to subscribe to source");
                    // Artifical sleep to create a problem
                    Thread.Sleep(TimeSpan.FromSeconds(2));
                    Console.WriteLine("Subscribing to source");
                    // Is this line safe?
                    subscription.Disposable = source.Subscribe(observer);
                }); 
            return subscription;
        });
    }    
}

This operator will subscribe to the source once the passed delay observable has completed. Look how reasonable it is - it uses a SerialDisposable to correctly present the two underlying temporally separate subscriptions to it's observer as a single disposable.

However, it's trivial to subvert this operator and get it to cause an exception:

void Main()
{
    var scheduler = new EventLoopScheduler();
    var rx = Observable.Range(0, 10, scheduler)
                       .ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1)));
    var subs = rx.Subscribe();

    Thread.Sleep(TimeSpan.FromSeconds(2));
    subs.Dispose();
    scheduler.Dispose();    
}

What's happening here? We are creating a Range on the EventLoopScheduler, but attaching our ReasonableDelay with delay stream created with a Timer using it's default scheduler.

Now we subscribe, wait until our delay stream is completed, then we dispose our subscription and the EventLoopScheduler in the "right order".

The artifical delay I inserted with Thread.Sleep ensures a race condition that could easily occur naturally - the delay has completed, the subscription has been disposed but it's too late to prevent the Range operator accessing the disposed EventLoopScheduler.

We can even tighten up our reasonable efforts to check if the observer has unsubscribed once the delay portion has completed:

// In the ReasonableDelay method
.Subscribe(_ => {}, () => {        
    if(!subscription.IsDisposed) // Check for unsubscribe
    {
        Console.WriteLine("Waiting to subscribe to source");
        // Artifical sleep to create a problem            
        Thread.Sleep(TimeSpan.FromSeconds(2));
        Console.WriteLine("Subscribing to source");
        // Is this line safe?                    
        subscription.Disposable = source.Subscribe(observer);
    }
}); 

It won't help. There's no way to use locking semantics purely in the context of this operator either.

What you're doing wrong

You have no business disposing that EventLoopScheduler! Once you have passed it to other Rx Operators, you have passed on the responsibility for it. It's up to the Rx Operators to follow the guidelines an clean up their subscriptions in as timely a manner as possible - which would mean directly or indirectly cancelling any pending scheduled items on the EventLoopScheduler and stopping any further scheduling so that it's queue empties as quickly as possible.

In the example above, you could attribute the issue to the somewhat contrived use of multiple schedulers and the forced Sleep in ReasonableDelay - but it's not hard to image a genuine scenario where an operator can't clean up immediately.

Essentially, by disposing the Rx scheduler you are doing the Rx equivalent of a thread abort. And just as in that scenario, you may have exceptions to handle!

The right thing to do is pull apart the mysterious SomeMoreRXFunctions() and ensure they are adhering to the guidelines as much as is reasonably possible.

like image 144
James World Avatar answered Sep 21 '22 10:09

James World


Just noticed this question as a link to this one: Reactive Rx 2.0 EventLoopScheduler ObjectDisposedException after dispose

Shall repost here what I did there - I'm not aware of any way to "flush" the scheduler, but you can wrap/handle the inevitable "object disposed" exception this way:

EventLoopScheduler scheduler = new EventLoopScheduler();
var wrappedScheduler = scheduler.Catch<Exception>((ex) => 
{
    Console.WriteLine("Got an exception:" + ex.ToString());
    return true;
});

for (int i = 0; i < 100; ++i)
{
    var handle = Observable.Interval(TimeSpan.FromMilliseconds(1))
                           .ObserveOn(wrappedScheduler)
                           .Subscribe(Observer.Create<long>((x) => Thread.Sleep(1000)));

    handles.Add(handle);
}
like image 24
JerKimball Avatar answered Sep 20 '22 10:09

JerKimball