Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Will this Reactive Extensions code leak memory?

I have written some code which turns the FileSystemWatcher's Changed event in to an observable sequence.

My goal is two split all file system changes in to separate streams and throttling them.

For example if I have 10 different files which change 3 times in half a second, I'll only get a notifcation once for each file.

What concerns me though is the GroupBy() operator. For this to work, (I assume) it would need to keep building up the group over time and consuming small amounts of memory.

Will this cause a "leak" and if so, how can I prevent it?

FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") {
    EnableRaisingEvents = true,
    NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size
};

void Main()
{
    var fileSystemEventStream = 
        Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
            (
                _ => _watcher.Changed += _, 
                _ => _watcher.Changed -= _
            )
            .ObserveOn(ThreadPoolScheduler.Instance)
            .SubscribeOn(ThreadPoolScheduler.Instance)
            .GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath)
            ;

    var res = 
        from fileGroup in fileSystemEventStream
        from file in fileGroup.Throttle(TimeSpan.FromSeconds(1))
        select file;

    res.Subscribe(
        ReceiveFsFullPath, 
        exception => {
            Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace);
        });

    Console.Read();
}

void ReceiveFsFullPath(string s){
    Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine(s);
}
like image 687
Razor Avatar asked Aug 03 '13 02:08

Razor


People also ask

How do I know if my code has a memory leak?

The primary tools for detecting memory leaks are the C/C++ debugger and the C Run-time Library (CRT) debug heap functions. The #define statement maps a base version of the CRT heap functions to the corresponding debug version. If you leave out the #define statement, the memory leak dump will be less detailed.

What could be the possible cause of memory leaks?

Memory leak occurs when programmers create a memory in heap and forget to delete it. The consequences of memory leak is that it reduces the performance of the computer by reducing the amount of available memory.

Do exceptions cause memory leaks?

The pointer to the object goes out of scope when the exception leaves the function, and the memory occupied by the object will never be recovered as long as the program is running. This is a memory leak; it would be detected by using the memory diagnostics.

Are memory leaks permanent?

Physical or permanent damage does not happen from memory leaks. Memory leaks are strictly a software issue, causing performance to slow down among applications within a given system.


2 Answers

Yes, for each new key, GroupBy creates a Subject, and maintains a dictionary of these subjects. And you are subscribing to each of these. So that is a small chunk of memory that will grow over time without anyway to release the old entries. What you really need is for the key to be removed when the throttle timer expires. I cannot think of a way to do this with the builtin operators. So you need a custom operator. Here's a stab at one.

public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay)
{
    return Observable.Create(observer =>
    {
        var notifications = new Subject<IObservable<T>>();
        var subscription = notifications.Merge().Subscribe(observer);
        var d = new Dictionary<T, IObserver<T>>();
        var gate = new object();
        var sourceSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(subscription, sourceSubscription);
        sourceSubscription.Disposable = source.Subscribe(value =>
        {
           IObserver<T> entry;
           lock(gate)
           {
             if (d.TryGetValue(value, out entry))
             {
               entry.OnNext(value);
             }
             else
             {
               var s = new Subject<T>();
               var o = s.Throttle(delay).FirstAsync().Do(() =>
               {
                 lock(gate)
                 {
                   d.Remove(value);
                 }
               });
               notifications.OnNext(o);
               d.Add(value, s);
               s.OnNext(value);
             }
          }
        }, observer.OnError, notifications.OnCompleted);

        return subscriptions;
    });
}

...
Observable.FromEventPattern(...)
    .Select(e => e.EventArgs.FullPath)
    .ThrottleDistinct(TimeSpan.FromSeconds(1))
    .Subscribe(...);
like image 145
Brandon Avatar answered Nov 15 '22 00:11

Brandon


As per Brandon's reply, the subjects will grow and have no way of being reclaimed*. My main concern with leaking memory here is that you don't capture the subscription! i.e.

res.Subscribe(...

must be replaced with

subscription = res.Subscribe(...

if you don't capture the subscription, you can never dispose of the subscription, thus you never release the event handlers, thus you have "leaked memory". Obviously this is no use if you dont somewhere actually dispose of the subscription.

*Well, if they completed then they would be auto disposed, so that would work. You could look to complete a sequence when a FileDeleted event came through?

like image 21
Lee Campbell Avatar answered Nov 15 '22 00:11

Lee Campbell