The following Rx.NET code will use up about 500 MB of memory after about 10 seconds on my machine.
var stream =
Observable.Range(0, 10000)
.SelectMany(i => Observable.Generate(
0,
j => true,
j => j + 1,
j => new { N = j },
j => TimeSpan.FromMilliseconds(1)));
stream.Subscribe();
If I use the Observable.Generate overload without a Func<int, TimeSpan> parameter my memory usage plateaus at 35 MB.
var stream =
Observable.Range(0, 10000)
.SelectMany(i => Observable.Generate(
0,
j => true,
j => j + 1,
j => new { N = j }));
// j => TimeSpan.FromMilliseconds(1))); ** Removed! **
stream.Subscribe();
It seems to only be a problem when using SelectMany() or Merge() extension methods.
This is an issue of which default scheduler is used.
With the TimeSpan version the scheduler is the DefaultScheduler. Without TimeSpan it is CurrentThreadScheduler.
So, for the time-based generate it's very rapidly trying to schedule all of the operations and basically builds up a massive queue of events waiting to be executed. Thus it uses a load of memory.
With the non-time-based generate it's using the current thread so it will produce and consume each generated value in series and thus use very little memory.
Oh, and this isn't a memory leak. It's just the normal operation if you try to schedule an infinite number of values faster than they can be consumed.
I decompiled the code to work out which schedulers were used.
Here's the non-time-based decompile:
public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
{
if (condition == null)
throw new ArgumentNullException("condition");
if (iterate == null)
throw new ArgumentNullException("iterate");
if (resultSelector == null)
throw new ArgumentNullException("resultSelector");
return Observable.s_impl.Generate<TState, TResult>(initialState, condition, iterate, resultSelector);
}
public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
{
return (IObservable<TResult>)new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
}
internal static IScheduler Iteration
{
get
{
return (IScheduler)CurrentThreadScheduler.Instance;
}
}
The above methods are from Observable, QueryLanguage, and SchedulerDefaults respectively.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With