I am using Python Watchdog to monitor a folder on Ubuntu. It's working fine with 1 or 2 files, but when I moved 50 files by command mv *.xml dest_folder then it received only 2 events and processed only 2 files. Below is the code.
def on_moved(self, event):
try:
logger.debug("on_moved event :" + str(event) )
self._validate_xml(event.dest_path)
except Exception as ex:
logger.exception(ex)
If I comment out _validate_xml function then I receive all 45 events.
Can any one tell me what is exactly happened in the Watchdog and what is the best solution for this?
I haven't used Python Watchdog, but from a generic real-time systems perspective,
_validate_xml can be slow, and make you miss events.To more you do while handling an event, the less "real-time" your system becomes. What you can do is offload the xml validity check to another process and exchange messages with a Queue (message would be event.dest_path) the paths you have seen moving. Your event handling will be as simple as putting messages on a queue, and the files can be processed in batch by the consumer of the queue.
In short:
Queue
fork() processon_moved handler, put messages on the queue,_validate_xml.multiprocessing.Pool do validate xml files in parallel.good luck.
EDIT: tested out on my system; most of the comments above seem not to apply because watchdog's code seems to handle threading just fine.
#!/usr/bin/env python
import time
from watchdog.observers import Observer, api
from watchdog.events import LoggingEventHandler, FileSystemEventHandler, FileMovedEvent
import logging
def counter_gen():
count = 0
while True:
count += 1
yield count
class XmlValidatorHandler(FileSystemEventHandler):
sleep_time = 0.1
COUNTER = counter_gen()
def on_moved(self, event):
if isinstance(event, FileMovedEvent):
print '%s - event %d; validate: %s' % (
type(self).__name__, self.COUNTER.next(), event.dest_path)
time.sleep(self.sleep_time)
class SlowXmlValidatorHandler(XmlValidatorHandler):
sleep_time = 2
COUNTER = counter_gen()
def get_observer(handler):
observer = Observer(timeout=0.5)
observer.event_queue.maxsize=10
observer.schedule(handler, path='.', recursive=True)
return observer
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
event_handler = LoggingEventHandler()
observer1 = get_observer(XmlValidatorHandler())
observer2 = get_observer(SlowXmlValidatorHandler())
observer1.start()
observer2.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer1.stop()
observer2.stop()
observer1.join()
observer2.join()
Wasn't able to reproduce your issue. some pointers:
maxsize, if you already have items in there and they don't get handled in a timely fashion, then my guess is that the timeout kicks in and the event is lost. You may want to resize in that case.timeout, if it is configured, you may want to tune that parameter.Maybe a more complete snippet would help us help you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With