I've been looking at wrapping a file watcher in an observable to aide in processing events but I'm having some trouble figuring out how to get the behaviour I want out of it. The file watcher watches a directory into which files are placed. When a file is first placed into that directory the Created event is fired on the file watcher. However if the file is large or the network connection slow then a series of Changed events are fired as the file updates. I don't want to process the file until it has finished being written so what I really need is this timeline
|Created |Changed |Changed |Changed
________________________________________________
^Write starts ^Write finishes ^Processing Starts
I looked at a number of methods of filtering the events in Rx but I couldn't get what I need which is "fire a function once a file file has not been changed for X seconds". Throttle is no good as it will lose events in the middle. Buffer is no good as events might occur on the buffer boundary.
I had thought about using timeouts but I wasn't crazy that they threw an exception and I wanted the processing to start as files were being written and not once there were no more events at all.
There is a similar question at Reactive Extensions vs FileSystemWatcher which was never really solved.
Is there a method such that I can do this easily? I'm sure this is not an uncommon use case.
ObservableFileSystemWatcher
- an observable wrapper around the FileSystemWatcher
type - works perfectly. Add a NuGet package named ReactiveFileSystemWatcher
and create a console application to test as follows
class Program
{
static void Main(string[] args)
{
using (var watcher = new ObservableFileSystemWatcher(c => { c.Path = @"C:\FolderToWatch\"; c.IncludeSubdirectories = true; }))
{
watcher.Created.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
watcher.Changed.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
watcher.Renamed.Select(x => $"{x.OldName} was {x.ChangeType} to {x.Name}").Subscribe(Console.WriteLine);
watcher.Deleted.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
watcher.Errors.Subscribe(Console.WriteLine);
watcher.Start();
Console.ReadLine();
}
}
}
EDIT: after review, don't think you want this...
Mayhap I'm oversimplifying a bit, but wouldn't Throttle
be ideal here?
This is by no means "simple", but I think it does what you want closer than my previous idea:
(bonus: with a test case! ;) )
void Main()
{
var pathToWatch = @"c:\temp\";
var fsw = new FileSystemWatcher(pathToWatch);
// set up observables for create and changed
var changedObs =
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
dlgt => fsw.Changed += dlgt,
dlgt => fsw.Changed -= dlgt);
var createdObs =
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
dlgt => fsw.Created += dlgt,
dlgt => fsw.Created -= dlgt);
// the longest we'll wait between last file write and calling it "changed"
var maximumTimeBetweenWrites = TimeSpan.FromSeconds(1);
// A "pulse" ticking off every 10ms (adjust this as desired)
var timer = Observable
.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10))
.Select(i => DateTime.Now);
var watcher =
from creation in createdObs
from change in changedObs
// we only care about changes matching a create
.Where(changeEvt => changeEvt.EventArgs.Name == creation.EventArgs.Name)
// take latest of (pulse, changes) and select (event, time since last file write)
.CombineLatest(timer, (evt, now) => new {
Change = evt,
DeltaFromLast = now.Subtract(new FileInfo(evt.EventArgs.FullPath).LastWriteTime)})
// skip all until we trigger than "time before considered changed" threshold
.SkipWhile(evt => evt.DeltaFromLast < maximumTimeBetweenWrites)
// Then lock on that until we change a diff file
.Distinct(evt => evt.Change.EventArgs.FullPath)
select change.Change;
var disp = new CompositeDisposable();
// to show creates
disp.Add(
createdObs.Subscribe(
evt => Console.WriteLine("New file:{0}",
evt.EventArgs.FullPath)));
// to show "final changes"
disp.Add(
watcher.Subscribe(
evt => Console.WriteLine("{0}:{1}:{2}",
evt.EventArgs.Name,
evt.EventArgs.ChangeType,
evt.EventArgs.FullPath)));
fsw.EnableRaisingEvents = true;
var rnd = new Random();
Enumerable.Range(0,10)
.AsParallel()
.ForAll(i =>
{
var filename = Path.Combine(pathToWatch, "foo" + i + ".txt");
if(File.Exists(filename))
File.Delete(filename);
foreach(var j in Enumerable.Range(0, 20))
{
var writer = File.AppendText(filename);
writer.WriteLine(j);
writer.Close();
Thread.Sleep(rnd.Next(500));
}
});
Console.WriteLine("Press enter to quit...");
Console.ReadLine();
disp.Dispose();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With