Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Watch for a file with asyncio

I'm trying to identify a good way to watch for the appearance of a file using Python's asyncio library. This is what I've come up with so far:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Watches for the appearance of a file."""

import argparse
import asyncio
import os.path


@asyncio.coroutine
def watch_for_file(file_path, interval=1):
    while True:
        if not os.path.exists(file_path):
            print("{} not found yet.".format(file_path))
            yield from asyncio.sleep(interval)
        else:
            print("{} found!".format(file_path))
            break


def make_cli_parser():
    cli_parser = argparse.ArgumentParser(description=__doc__)
    cli_parser.add_argument('file_path')
    return cli_parser


def main(argv=None):
    cli_parser = make_cli_parser()
    args = cli_parser.parse_args(argv)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(watch_for_file(args.file_path))

if __name__ == '__main__':
    main()

I saved this as watch_for_file.py, and can run it with

python3 watch_for_file.py testfile

In another shell session, I issue

touch testfile

to end the loop.

Is there a more elegant solution than using this infinite loop and yield from asyncio.sleep()?

like image 302
gotgenes Avatar asked Oct 16 '14 21:10

gotgenes


People also ask

Can I use requests with Asyncio?

asyncio enables to actually handle many concurrent (not parallel!) requests with no threads at all (well, just one). However, requests does not support asyncio so you need to create threads to get concurrency.

What is Aiofiles?

aiofiles is an Apache2 licensed library, written in Python, for handling local disk files in asyncio applications. Ordinary local file IO is blocking, and cannot easily and portably made asynchronous. This means doing file IO may interfere with asyncio applications, which shouldn't block the executing thread.

What is Asyncio Create_task?

The method create_task takes a coroutine object as a parameter and returns a Task object, which inherits from asyncio. Future . The call creates the task inside the event loop for the current thread, and starts the task executing at the beginning of the coroutine's code-block.

Is Asyncio preemptive?

With asyncio, we can create cooperative multitasking programs. This is different from preemptive multitasking, in which the scheduler can force a task to stop in order to run another task.


2 Answers

Butter is really cool. Another alternative is minotaur which is similar, but only implements inotify

async def main():
    with Inotify(blocking=False) as n:
        n.add_watch('.', Mask.CREATE | Mask.DELETE | Mask.MOVE)
        async for evt in n:
            print(evt)
like image 185
scaramanga Avatar answered Sep 22 '22 04:09

scaramanga


Well, there are nicer, platform-specific ways of being notified when a file is created. Gerrat linked to one for Windows in his comment, and pyinotify can be used for Linux. Those platform-specific approaches can probably be plugged into asyncio, but you'd end up writing a whole bunch of code to make it work in a platform independent way, which probably isn't worth the effort to just check for the appearance of a single file. If you need more sophisticated filesystem watching in addition to this, it is might be worth pursuing, though. It looks like pyinotify can be tweaked to add a subclass of its Notifier class that plugins into the asyncio event loop (there are already classes for tornado and asyncore), for example.

For your simple use-case, I think your infinite loop approach to polling is fine, but you could also just schedule callbacks with the event loop, if you wanted:

def watch_for_file(file_path, interval=1, loop=None):
    if not loop: loop = asyncio.get_event_loop()
    if not os.path.exists(file_path):
        print("{} not found yet.".format(file_path))
        loop.call_later(interval, watch_for_file, file_path, interval, loop)
    else:
        print("{} found!".format(file_path))
        loop.stop()

def main(argv=None):
    cli_parser = make_cli_parser()
    args = cli_parser.parse_args(argv)
    loop = asyncio.get_event_loop()
    loop.call_soon(watch_for_file, args.file_path)
    loop.run_forever()

I'm not sure this is much more elegant than the infinite loop, though.

Edit:

Just for fun, I wrote a solution using pyinotify:

import pyinotify
import asyncio
import argparse
import os.path


class AsyncioNotifier(pyinotify.Notifier):
    """

    Notifier subclass that plugs into the asyncio event loop.

    """
    def __init__(self, watch_manager, loop, callback=None,
                 default_proc_fun=None, read_freq=0, threshold=0, timeout=None):
        self.loop = loop
        self.handle_read_callback = callback
        pyinotify.Notifier.__init__(self, watch_manager, default_proc_fun, read_freq,
                                    threshold, timeout)
        loop.add_reader(self._fd, self.handle_read)

    def handle_read(self, *args, **kwargs):
        self.read_events()
        self.process_events()
        if self.handle_read_callback is not None:
            self.handle_read_callback(self)


class EventHandler(pyinotify.ProcessEvent):
    def my_init(self, file=None, loop=None):
        if not file:
            raise ValueError("file keyword argument must be provided")
        self.loop = loop if loop else asyncio.get_event_loop()
        self.filename = file

    def process_IN_CREATE(self, event):
        print("Creating:", event.pathname)
        if os.path.basename(event.pathname) == self.filename:
            print("Found it!")
            self.loop.stop()


def make_cli_parser():
    cli_parser = argparse.ArgumentParser(description=__doc__)
    cli_parser.add_argument('file_path')
    return cli_parser


def main(argv=None):
    cli_parser = make_cli_parser()
    args = cli_parser.parse_args(argv)
    loop = asyncio.get_event_loop()

    # set up pyinotify stuff
    wm = pyinotify.WatchManager()
    mask = pyinotify.IN_CREATE  # watched events
    dir_, filename = os.path.split(args.file_path)
    if not dir_:
        dir_ = "."
    wm.add_watch(dir_, mask)
    handler = EventHandler(file=filename, loop=loop)
    notifier = pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=handler)

    loop.run_forever()

if __name__ == '__main__':
    main()
like image 36
dano Avatar answered Sep 24 '22 04:09

dano