Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions vs FileSystemWatcher

One of the things that has long bugged me about the FileSystemWatcher is the way it fires multiple events for a single logical change to a file. I know why it happens, but I don't want to have to care - I just want to reparse the file once, not 4-6 times in a row. Ideally, there would be an event that only fires when a given file is done changing, rather than every step along the way.

Over the years I've come up with various solutions to this problem, of varying degrees of ugliness. I thought Reactive Extensions would be the ultimate solution, but there's something I'm not doing right, and I'm hoping someone can point out my mistake.

I have an extension method:

public static IObservable<IEvent<FileSystemEventArgs>> GetChanged(this FileSystemWatcher that)
{
    return Observable.FromEvent<FileSystemEventArgs>(that, "Changed");
}

Ultimately, I would like to get one event per filename, within a given time period - so that four events in a row with a single filename are reduced to one event, but I don't lose anything if multiple files are modified at the same time. BufferWithTime sounds like the ideal solution.

var bufferedChange = watcher.GetChanged()
    .Select(e => e.EventArgs.FullPath)
    .BufferWithTime(TimeSpan.FromSeconds(1))
    .Where(e => e.Count > 0)
    .Select(e => e.Distinct());

When I subscribe to this observable, a single change to a monitored file triggers my subscription method four times in a row, which rather defeats the purpose. If I remove the Distinct() call, I see that each of the four calls contains two identical events - so there is some buffering going on. Increasing the TimeSpan passed to BufferWithTime seems to have no effect - I went as high as 20 seconds without any change in behavior.

This is my first foray into Rx, so I'm probably missing something obvious. Am I doing it wrong? Is there a better approach? Thanks for any suggestions...

like image 611
Joel Mueller Avatar asked Apr 20 '10 19:04

Joel Mueller


3 Answers

Just to warm up an old topic, as I'm working on that right now, too:

Of course this topic is negligible in the context of watching one file, as FileSystemWatcher only fires every ~3 seconds with a Changed event for a single file when you track Size via

_fileSystemWatcher.NotifyFilter = NotifyFilters.Size | ....

But let's assume FileSystemWatcher would fire many events in a row (maybe many files are changed/renamed/created), and other people read this:

You don't want to use Throttle or BufferWithTime in this case: Throttle is a bit misleading.. it prohibits any firing until TimeSpan time is elapsed without an event. Meaning: it could never fire when you use something like Throttle(TimeSpan.FromMilliseconds(200)), and after every event there is a pause < 200 ms. So it's not really the "throttling" people expect. It's good for user input, when you want to wait until the user has stopped typing something. It's bad for load throttling.

BufferWithTime is also not what you want: it just fills a timebuffer. Good when you have a high initial load per event, like opening a connection to a webservice. In that case you would want to batch process events every "time" seconds. But not when loadbalancing, as the number of events don't change.

The solution is the Sample(TimeSpan time) method: it takes the last event within a TimeSpan, which is the "real" Throttle. I think the Rx guys really messed up the naming in this case.

like image 103
hko Avatar answered Oct 23 '22 18:10

hko


you could use a group by to aggregate file system events per filename, and use the resulting observable with the Throttle extensions method. I've written a small sample using integers, but the basic idea is the same.

var obs = from n in Enumerable.Range(1, 40).ToObservable()
    group n by n / 10 into g
    select new { g.Key, Obs = g.Throttle(TimeSpan.FromMilliseconds(10.0)) } into h
    from x in h.Obs
    select x;
obs.Subscribe(x => Console.WriteLine(x));

outputs:

9 
19 
29 
39 
40 

which is for each group (n/10) the last observed integer.

like image 5
Joost Morsink Avatar answered Oct 23 '22 19:10

Joost Morsink


My mistake. Somehow I've got multiple FileSystemWatchers monitoring each other's folders. The observable was triggering once for each watcher, but BufferWithTime appears to be working correctly. I still need to figure out why my watchers are firing events for folders I thought they were configured to ignore, but that's got nothing to do with Rx or this question.

In fact, maybe I can punt on that problem, and switch to having a single watcher monitoring a parent folder, using Rx to filter out events from folders I'm not interested in.

like image 3
Joel Mueller Avatar answered Oct 23 '22 18:10

Joel Mueller