I am investigating the use of Observable.Generate to create a sequence of results sampled at intervals using the examples from the msdn website as a starting point.
The following code WITHOUT a TimeSpan selector does not exhibit a memory leak:
IObservable<string> obs = Observable.Generate(initialState: 1,
condition: x => x < 1000,
iterate: x => x + 1,
resultSelector: x => x.ToString());
obs.Subscribe(x => Console.WriteLine(x));
However, the following code WITH a TimeSpan selector exhibits a memory leak:
TimeSpan timeSpan = TimeSpan.FromSeconds(1);
IObservable<string> obs = Observable.Generate(initialState: 1,
condition: x => x < 1000,
iterate: x => x + 1,
resultSelector: x => x.ToString(),
timeSelector: x => timeSpan);
obs.Subscribe(x => Console.WriteLine(x));
For example, this toy app will quickly show the memory leak using the Memory Profiler which ships with VS 2015 Community:
using System;
using System.Reactive.Linq;
namespace Sample
{
public class Program
{
static void Main()
{
IObservable<string> obs = Observable.Generate(1, x => x < 1000*1000, x => x + 1, x => x.ToString(), x => TimeSpan.FromMilliseconds(500));
obs.Subscribe(x => { /*Do nothing but simply run the observable*/ });
Console.ReadLine();
}
}
}
The memory leak is a growing collection of:
System.Reactive.Disposables StableCompositeDisposable.Binary
System.Reactive.Disposables SingleAssignmentDisposable
Am I using this API incorrectly? Should I expect the memory to grow or is this a bug with Reactive?
This does look like a bug to me - or at least messy/undesirable behaviour in the DefaultScheduler's "recursive" scheduling implementation (it's not really recursive, I'm talking about the overload that passes in the scheduler itself to a scheduled action so you can schedule a continuation).
The disposables you are seeing build up are created by the call to the DefaultScheduler.Schedule method (line 71 here: https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs).
There are a couple of reasons why other attempts here to spot this failed. Firstly, the disposables ARE eventually disposed - but only when the Generate OnCompletes
or OnErrors
, at which point the System.Reactive.AnonymousSafeObserver<T>
returned by Generate when you subscribe to it does it's clean up.
Secondly, if you use a short TimeSpan
(remember the .NET Timer minimum resolution is 15ms anyway) then Rx will optimize away the use of a timer and call QueueUserWorkItem
with no timer being used so these disposables don't ever get created.
If you dig into Generate's implementation (https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs) you can see that it passes the IDisposable
returned by the initial call to Schedule passing it back to the observer which hangs on to it until error/completion. That prevents the entire resulting chain of recursive calls being collectable - and means that if you do need to cancel, or when clean-up happens, only then will every scheduled action's disposable be disposed.
You can see the same effect in the code below which uses the DefaultScheduler directly - the reference to cancel
in the last line is enough to cause the leak. Make sure to use a release build otherwise the compiler will keep hold of cancel until the method end regardless.
// ensure you are using a release build of this code
ManualResetEvent mre = new ManualResetEvent();
IDisposable cancel;
int maxCount = 20;
TimeSpan timeSpan = TimeSpan.FromSeconds(1);
Func<IScheduler, int, IDisposable> recurse = null;
recurse = (self, state) =>
{
Console.WriteLine(state);
if (state == maxCount)
{
mre.Set();
return Disposable.Empty;
}
return self.Schedule(state + 1, timeSpan, recurse);
};
cancel = Scheduler.Default.Schedule(1, timeSpan, recurse);
mre.WaitOne();
// uncomment the following line, and you'll get the same leak
// leave it commented, and cancel reference is GC'd early and there's no leak
// if(cancel == null) Console.WriteLine("Hang on to cancel");
I used Jetbrains dotMemory API to take memory dumps to draw conclusions here - I've stripped the code above of those API calls, but there is a full gist here if you have that product, and you'll be able to see the impact of uncommenting the final line quite clearly: https://gist.github.com/james-world/f20377ea610fb8fc0ee811d27f7a837c Alternatively, you could use the MS profiler API - which I don't have paged into my brain's working set at the moment!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With