I am using Python Watchdog to monitor a folder on Ubuntu. It's working fine with 1 or 2 files, but when I moved 50 files by command mv *.xml dest_folder
then it received only 2 events and processed only 2 files. Below is the code.
def on_moved(self, event):
try:
logger.debug("on_moved event :" + str(event) )
self._validate_xml(event.dest_path)
except Exception as ex:
logger.exception(ex)
If I comment out _validate_xml
function then I receive all 45 events.
Can any one tell me what is exactly happened in the Watchdog and what is the best solution for this?
I haven't used Python Watchdog, but from a generic real-time systems perspective,
_validate_xml
can be slow, and make you miss events.To more you do while handling an event, the less "real-time" your system becomes. What you can do is offload the xml validity check to another process and exchange messages with a Queue
(message would be event.dest_path
) the paths you have seen moving. Your event handling will be as simple as putting messages on a queue, and the files can be processed in batch by the consumer of the queue.
In short:
Queue
fork()
processon_moved
handler, put messages on the queue,_validate_xml
.multiprocessing.Pool
do validate xml files in parallel.good luck.
EDIT: tested out on my system; most of the comments above seem not to apply because watchdog's code seems to handle threading
just fine.
#!/usr/bin/env python
import time
from watchdog.observers import Observer, api
from watchdog.events import LoggingEventHandler, FileSystemEventHandler, FileMovedEvent
import logging
def counter_gen():
count = 0
while True:
count += 1
yield count
class XmlValidatorHandler(FileSystemEventHandler):
sleep_time = 0.1
COUNTER = counter_gen()
def on_moved(self, event):
if isinstance(event, FileMovedEvent):
print '%s - event %d; validate: %s' % (
type(self).__name__, self.COUNTER.next(), event.dest_path)
time.sleep(self.sleep_time)
class SlowXmlValidatorHandler(XmlValidatorHandler):
sleep_time = 2
COUNTER = counter_gen()
def get_observer(handler):
observer = Observer(timeout=0.5)
observer.event_queue.maxsize=10
observer.schedule(handler, path='.', recursive=True)
return observer
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
event_handler = LoggingEventHandler()
observer1 = get_observer(XmlValidatorHandler())
observer2 = get_observer(SlowXmlValidatorHandler())
observer1.start()
observer2.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer1.stop()
observer2.stop()
observer1.join()
observer2.join()
Wasn't able to reproduce your issue. some pointers:
maxsize
, if you already have items in there and they don't get handled in a timely fashion, then my guess is that the timeout
kicks in and the event
is lost. You may want to resize in that case.timeout
, if it is configured, you may want to tune that parameter.Maybe a more complete snippet would help us help you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With