Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable.Generate with TimeSpan selector appears to leak memory [When using a TimeSpan > 15ms]

I am investigating the use of Observable.Generate to create a sequence of results sampled at intervals using the examples from the msdn website as a starting point.

The following code WITHOUT a TimeSpan selector does not exhibit a memory leak:

IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString());
obs.Subscribe(x => Console.WriteLine(x));

However, the following code WITH a TimeSpan selector exhibits a memory leak:

TimeSpan timeSpan = TimeSpan.FromSeconds(1);
IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString(),
                                              timeSelector: x => timeSpan);
obs.Subscribe(x => Console.WriteLine(x));

For example, this toy app will quickly show the memory leak using the Memory Profiler which ships with VS 2015 Community:

using System;
using System.Reactive.Linq;

namespace Sample
{
    public class Program
    {
        static void Main()
        {
            IObservable<string> obs = Observable.Generate(1, x => x < 1000*1000, x => x + 1, x => x.ToString(), x => TimeSpan.FromMilliseconds(500));
            obs.Subscribe(x => { /*Do nothing but simply run the observable*/ });
            Console.ReadLine();
        }
    }
}

The memory leak is a growing collection of:

System.Reactive.Disposables StableCompositeDisposable.Binary
System.Reactive.Disposables SingleAssignmentDisposable

Am I using this API incorrectly? Should I expect the memory to grow or is this a bug with Reactive?

like image 915
Benjamin Osborne Avatar asked Dec 19 '16 13:12

Benjamin Osborne


1 Answers

This does look like a bug to me - or at least messy/undesirable behaviour in the DefaultScheduler's "recursive" scheduling implementation (it's not really recursive, I'm talking about the overload that passes in the scheduler itself to a scheduled action so you can schedule a continuation).

The disposables you are seeing build up are created by the call to the DefaultScheduler.Schedule method (line 71 here: https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs).

There are a couple of reasons why other attempts here to spot this failed. Firstly, the disposables ARE eventually disposed - but only when the Generate OnCompletes or OnErrors, at which point the System.Reactive.AnonymousSafeObserver<T> returned by Generate when you subscribe to it does it's clean up.

Secondly, if you use a short TimeSpan (remember the .NET Timer minimum resolution is 15ms anyway) then Rx will optimize away the use of a timer and call QueueUserWorkItem with no timer being used so these disposables don't ever get created.

If you dig into Generate's implementation (https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs) you can see that it passes the IDisposable returned by the initial call to Schedule passing it back to the observer which hangs on to it until error/completion. That prevents the entire resulting chain of recursive calls being collectable - and means that if you do need to cancel, or when clean-up happens, only then will every scheduled action's disposable be disposed.

You can see the same effect in the code below which uses the DefaultScheduler directly - the reference to cancel in the last line is enough to cause the leak. Make sure to use a release build otherwise the compiler will keep hold of cancel until the method end regardless.

// ensure you are using a release build of this code
ManualResetEvent mre = new ManualResetEvent();
IDisposable cancel;
int maxCount = 20;

TimeSpan timeSpan = TimeSpan.FromSeconds(1);

Func<IScheduler, int, IDisposable> recurse = null;
recurse = (self, state) =>
{
    Console.WriteLine(state);

    if (state == maxCount)
    {
        mre.Set();
        return Disposable.Empty;
    }

    return self.Schedule(state + 1, timeSpan, recurse);
};

cancel = Scheduler.Default.Schedule(1, timeSpan, recurse);

mre.WaitOne();

// uncomment the following line, and you'll get the same leak
// leave it commented, and cancel reference is GC'd early and there's no leak
// if(cancel == null) Console.WriteLine("Hang on to cancel");

I used Jetbrains dotMemory API to take memory dumps to draw conclusions here - I've stripped the code above of those API calls, but there is a full gist here if you have that product, and you'll be able to see the impact of uncommenting the final line quite clearly: https://gist.github.com/james-world/f20377ea610fb8fc0ee811d27f7a837c Alternatively, you could use the MS profiler API - which I don't have paged into my brain's working set at the moment!

like image 128
James World Avatar answered Oct 14 '22 07:10

James World