I am seeking for an example for flask-based web application with watchdog observer. More specifically, I want to use the watchdog observer to detect any changes in pre-defined directories and update a web application based on the changes. I can find many examples for each of them, i.e. flask-based web applications and watchdog observer examples.
But, I don't know how to integrate two examples and run them smoothly. Can anyone provide a simple example?
Also, I wonder if I can run the watchdog observer with Celery worker?
Thanks
EDIT: I used a celery worker to run the watchdog observer to watch a directory and its subdirectories as follows:
@celery.task(bind=True)
def _watcher(self):
observer = Observer()
handler = MyHandler()
observer.schedule(handler, '.')
observer.start()
try:
while True:
if not handler.event_q.empty():
event, ts = handler.event_q.get()
self.update_state(state='PROGRESS', meta={'src_path': event.src_path, 'event_type': event.event_type})
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
return {'src_path': 'srcpath', 'event_type': 'eventtype'}
Then, from the front-end side, every 1 second, it called GET function to update any changes, if there are. This is a bit hacky.
What I eventually want to achieve is that 1) keep watching a directory and its subdirectories, 2) if there are any changes, update a database according to the changes and 3) update front-end side based on the changes.
So far, I could update a database based the changes in the filesystem using watchdog (MyHandler class in the above code). But, I am still seeking for a better solution to observe the changes within a flask framework and to update the changes in the front-end side.
I ended up using multithreading.
from flask import Flask, render_template, jsonify
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from queue import Queue
import time
import threading
class MyHandler(FileSystemEventHandler):
def __init__(self, pattern=None):
self.pattern = pattern or (".xml", ".tiff", ".jpg")
self.event_q = Queue()
self.dummyThread = None
def on_any_event(self, event):
if not event.is_directory and event.src_path.endswith(self.pattern):
self.event_q.put((event, time.time()))
def start(self):
self.dummyThread = threading.Thread(target=self._process)
self.dummyThread.daemon = True
self.dummyThread.start()
def _process(self):
while True:
time.sleep(1)
app = Flask(__name__)
handler = MyHandler()
handler.start()
eventlist_flag = 0
eventlist = []
def run_watcher():
global eventlist_flag, eventlist
observer = Observer()
observer.schedule(handler, '.')
observer.start()
try:
while True:
if eventlist_flag == 0:
eventlist_flag = 1
while not handler.event_q.empty():
event, ts = handler.event_q.get()
eventlist.append(event)
eventlist_flag = 0
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
@app.route('/watcher/status', methods=['POST'])
def watchernow():
global eventlist_flag, eventlist
if eventlist_flag == 0 and len(eventlist) > 0:
eventlist_flag = 2
for e in eventlist:
print(e)
eventlist = []
eventlist_flag = 0
return jsonify({})
@app.route('/')
def index():
return render_template('index.html')
if __name__ == '__main__':
watcher_thread = threading.Thread(target=run_watcher)
watcher_thread.start()
app.run(debug=True)
watcher_thread.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With