I'm trying to do a POC on the numerous independent data feeds. Sort of classical observer style application. Number of data feeds might vary from few hundred to few thousand, and numbers of observers might vary somewhat from 2 to 20000. Here's a quick example of simple data feed observable mock-up:
public class FeedMockUp
{
private readonly IScheduler observerScheduler;
private readonly Random rnd = new Random((int)DateTime.Now.Ticks);
private readonly Subject<double> sourceObservable;
private readonly IObservable<double> testedObservable;
public FeedMockUp(IScheduler observerScheduler)
{
this.observerScheduler = observerScheduler;
sourceObservable = new Subject<double>();
testedObservable =
Observable.Create<double>(x =>
{
var underlyingSourceDisposable =
sourceObservable
.Subscribe(_ => x.OnNext(rnd.NextDouble()));
return underlyingSourceDisposable;
});
}
public IDisposable SubscribeToUnderlyingFeed(int numberOfSubscribers)
{
int counter = 0;
var disposable = new CompositeDisposable();
for (int i = 0; i < numberOfSubscribers; i++)
{
disposable.Add(testedObservable
.ObserveOn(observerScheduler)
.Subscribe(_ => Interlocked.Increment(ref counter)));
}
return disposable;
}
public void PushNewFeed()
{
sourceObservable.OnNext(rnd.NextDouble());
}
}
While I was playing around with the shedulers in order to improve the throughput of observables update I have noticed that while using EventLoopScheduler
memory consumption of the application having 100 data feeds with 1000 observers was quite constant, for 1000 observers it being ~100Mb and growing linearly when adding new observers to the mix.
However when I've tried using TaskPoolScheduler, on x86 process I've started getting OutOfMemoryException
exceptions and on x64 process memory consumption exploded, or rather, became quite indeterminate ranging anywhere from 1Gb to 2Gb for just 500 observers and growing almost exponentially with new observers in the mix.
Here's the code I've been using for testing. Can you see what's wrong with it? Why such a difference in performance? Guessing, there's some internal copying/queuing involved here, but it's just my guess. Ideally I'd like to find out what's happening here without diving to RX code base...
private static void Main(string[] args)
{
const int displayItemCount = 100;
const int callbackCount = 500;
//var rtScheduler = new EventLoopScheduler();
var rtScheduler = TaskPoolScheduler.Default;
var rtFeeds = new List<FeedMockUp>();
for (int i = 0; i < displayItemCount; i++)
{
var mockFeed = new FeedMockUp(rtScheduler);
mockFeed.SubscribeToUnderlyingFeed(callbackCount);
rtFeeds.Add(mockFeed);
}
foreach (var rtFeedMockUp in rtFeeds)
{
rtFeedMockUp.PushNewFeed();
}
Console.WriteLine("Memory used for feed {0} mockups with {1} observers / callbacks. Memory {2} Mb",
displayItemCount, callbackCount, Environment.WorkingSet / (1024 * 1024));
Console.ReadKey();
}
You probably want to use TaskPoolScheduler.Default.DisableOptimizations(typeof(ISchedulerLongRunning))
. The EventLoopScheduler is a good alternative if you don't mind loosing parallelism.
This option is preferable in case you still want to execute work in parallel but want to use a thread pool thread.
Using ObserveOn
with TaskPoolScheduler
is essentially going to create a LongRunning task for each observer.
And the default TaskScheduler
ends up creating a Thread
for each LongRunning
tasks.
And each Thread reserves roughly 1MB for its stack.
So, 500 observers using the TaskPoolScheduler
will reserve at least 500MB. You can see where this is going...
The EventLoopScheduler
, on the other hand, runs on a single thread. So using ObserveOn
with this scheduler effectively just adds an entry to the scheduler's work queue. This entry is much much smaller than the 1MB cost of a Thread.
So, the EventLoopScheduler
is much more memory efficient for this scenario, but it also notifies the observers serially and if there are alot of observers and the source is producing at a high frequency, then you'll start to accumulate a buffer of unsent events.
The TaskPoolScheduler
is less memory efficient but will notify observers concurrently and thus can potentially handle higher frequency events than the EventLoopScheduler
by utilizing all of the cores on your machine.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With