I'm trying to identify a good way to watch for the appearance of a file using Python's asyncio library. This is what I've come up with so far:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Watches for the appearance of a file."""
import argparse
import asyncio
import os.path
@asyncio.coroutine
def watch_for_file(file_path, interval=1):
while True:
if not os.path.exists(file_path):
print("{} not found yet.".format(file_path))
yield from asyncio.sleep(interval)
else:
print("{} found!".format(file_path))
break
def make_cli_parser():
cli_parser = argparse.ArgumentParser(description=__doc__)
cli_parser.add_argument('file_path')
return cli_parser
def main(argv=None):
cli_parser = make_cli_parser()
args = cli_parser.parse_args(argv)
loop = asyncio.get_event_loop()
loop.run_until_complete(watch_for_file(args.file_path))
if __name__ == '__main__':
main()
I saved this as watch_for_file.py
, and can run it with
python3 watch_for_file.py testfile
In another shell session, I issue
touch testfile
to end the loop.
Is there a more elegant solution than using this infinite loop and yield from asyncio.sleep()
?
asyncio enables to actually handle many concurrent (not parallel!) requests with no threads at all (well, just one). However, requests does not support asyncio so you need to create threads to get concurrency.
aiofiles is an Apache2 licensed library, written in Python, for handling local disk files in asyncio applications. Ordinary local file IO is blocking, and cannot easily and portably made asynchronous. This means doing file IO may interfere with asyncio applications, which shouldn't block the executing thread.
The method create_task takes a coroutine object as a parameter and returns a Task object, which inherits from asyncio. Future . The call creates the task inside the event loop for the current thread, and starts the task executing at the beginning of the coroutine's code-block.
With asyncio, we can create cooperative multitasking programs. This is different from preemptive multitasking, in which the scheduler can force a task to stop in order to run another task.
Butter is really cool. Another alternative is minotaur which is similar, but only implements inotify
async def main():
with Inotify(blocking=False) as n:
n.add_watch('.', Mask.CREATE | Mask.DELETE | Mask.MOVE)
async for evt in n:
print(evt)
Well, there are nicer, platform-specific ways of being notified when a file is created. Gerrat linked to one for Windows in his comment, and pyinotify
can be used for Linux. Those platform-specific approaches can probably be plugged into asyncio
, but you'd end up writing a whole bunch of code to make it work in a platform independent way, which probably isn't worth the effort to just check for the appearance of a single file. If you need more sophisticated filesystem watching in addition to this, it is might be worth pursuing, though. It looks like pyinotify
can be tweaked to add a subclass of its Notifier
class that plugins into the asyncio
event loop (there are already classes for tornado
and asyncore
), for example.
For your simple use-case, I think your infinite loop approach to polling is fine, but you could also just schedule callbacks with the event loop, if you wanted:
def watch_for_file(file_path, interval=1, loop=None):
if not loop: loop = asyncio.get_event_loop()
if not os.path.exists(file_path):
print("{} not found yet.".format(file_path))
loop.call_later(interval, watch_for_file, file_path, interval, loop)
else:
print("{} found!".format(file_path))
loop.stop()
def main(argv=None):
cli_parser = make_cli_parser()
args = cli_parser.parse_args(argv)
loop = asyncio.get_event_loop()
loop.call_soon(watch_for_file, args.file_path)
loop.run_forever()
I'm not sure this is much more elegant than the infinite loop, though.
Edit:
Just for fun, I wrote a solution using pyinotify
:
import pyinotify
import asyncio
import argparse
import os.path
class AsyncioNotifier(pyinotify.Notifier):
"""
Notifier subclass that plugs into the asyncio event loop.
"""
def __init__(self, watch_manager, loop, callback=None,
default_proc_fun=None, read_freq=0, threshold=0, timeout=None):
self.loop = loop
self.handle_read_callback = callback
pyinotify.Notifier.__init__(self, watch_manager, default_proc_fun, read_freq,
threshold, timeout)
loop.add_reader(self._fd, self.handle_read)
def handle_read(self, *args, **kwargs):
self.read_events()
self.process_events()
if self.handle_read_callback is not None:
self.handle_read_callback(self)
class EventHandler(pyinotify.ProcessEvent):
def my_init(self, file=None, loop=None):
if not file:
raise ValueError("file keyword argument must be provided")
self.loop = loop if loop else asyncio.get_event_loop()
self.filename = file
def process_IN_CREATE(self, event):
print("Creating:", event.pathname)
if os.path.basename(event.pathname) == self.filename:
print("Found it!")
self.loop.stop()
def make_cli_parser():
cli_parser = argparse.ArgumentParser(description=__doc__)
cli_parser.add_argument('file_path')
return cli_parser
def main(argv=None):
cli_parser = make_cli_parser()
args = cli_parser.parse_args(argv)
loop = asyncio.get_event_loop()
# set up pyinotify stuff
wm = pyinotify.WatchManager()
mask = pyinotify.IN_CREATE # watched events
dir_, filename = os.path.split(args.file_path)
if not dir_:
dir_ = "."
wm.add_watch(dir_, mask)
handler = EventHandler(file=filename, loop=loop)
notifier = pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=handler)
loop.run_forever()
if __name__ == '__main__':
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With