I have written some code which turns the FileSystemWatcher
's Changed
event in to an observable sequence.
My goal is two split all file system changes in to separate streams and throttling them.
For example if I have 10 different files which change 3 times in half a second, I'll only get a notifcation once for each file.
What concerns me though is the GroupBy()
operator. For this to work, (I assume) it would need to keep building up the group over time and consuming small amounts of memory.
Will this cause a "leak" and if so, how can I prevent it?
FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") {
EnableRaisingEvents = true,
NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size
};
void Main()
{
var fileSystemEventStream =
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
(
_ => _watcher.Changed += _,
_ => _watcher.Changed -= _
)
.ObserveOn(ThreadPoolScheduler.Instance)
.SubscribeOn(ThreadPoolScheduler.Instance)
.GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath)
;
var res =
from fileGroup in fileSystemEventStream
from file in fileGroup.Throttle(TimeSpan.FromSeconds(1))
select file;
res.Subscribe(
ReceiveFsFullPath,
exception => {
Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace);
});
Console.Read();
}
void ReceiveFsFullPath(string s){
Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(s);
}
The primary tools for detecting memory leaks are the C/C++ debugger and the C Run-time Library (CRT) debug heap functions. The #define statement maps a base version of the CRT heap functions to the corresponding debug version. If you leave out the #define statement, the memory leak dump will be less detailed.
Memory leak occurs when programmers create a memory in heap and forget to delete it. The consequences of memory leak is that it reduces the performance of the computer by reducing the amount of available memory.
The pointer to the object goes out of scope when the exception leaves the function, and the memory occupied by the object will never be recovered as long as the program is running. This is a memory leak; it would be detected by using the memory diagnostics.
Physical or permanent damage does not happen from memory leaks. Memory leaks are strictly a software issue, causing performance to slow down among applications within a given system.
Yes, for each new key, GroupBy creates a Subject, and maintains a dictionary of these subjects. And you are subscribing to each of these. So that is a small chunk of memory that will grow over time without anyway to release the old entries. What you really need is for the key to be removed when the throttle timer expires. I cannot think of a way to do this with the builtin operators. So you need a custom operator. Here's a stab at one.
public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay)
{
return Observable.Create(observer =>
{
var notifications = new Subject<IObservable<T>>();
var subscription = notifications.Merge().Subscribe(observer);
var d = new Dictionary<T, IObserver<T>>();
var gate = new object();
var sourceSubscription = new SingleAssignmentDisposable();
var subscriptions = new CompositeDisposable(subscription, sourceSubscription);
sourceSubscription.Disposable = source.Subscribe(value =>
{
IObserver<T> entry;
lock(gate)
{
if (d.TryGetValue(value, out entry))
{
entry.OnNext(value);
}
else
{
var s = new Subject<T>();
var o = s.Throttle(delay).FirstAsync().Do(() =>
{
lock(gate)
{
d.Remove(value);
}
});
notifications.OnNext(o);
d.Add(value, s);
s.OnNext(value);
}
}
}, observer.OnError, notifications.OnCompleted);
return subscriptions;
});
}
...
Observable.FromEventPattern(...)
.Select(e => e.EventArgs.FullPath)
.ThrottleDistinct(TimeSpan.FromSeconds(1))
.Subscribe(...);
As per Brandon's reply, the subjects will grow and have no way of being reclaimed*. My main concern with leaking memory here is that you don't capture the subscription! i.e.
res.Subscribe(...
must be replaced with
subscription = res.Subscribe(...
if you don't capture the subscription, you can never dispose of the subscription, thus you never release the event handlers, thus you have "leaked memory". Obviously this is no use if you dont somewhere actually dispose of the subscription.
*Well, if they completed then they would be auto disposed, so that would work. You could look to complete a sequence when a FileDeleted event came through?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With