Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can luigi rerun tasks when the task dependencies become out of date?

Tags:

python

luigi

As far as I know, a luigi.Target can either exist, or not. Therefore, if a luigi.Target exists, it wouldn't be recomputed.

I'm looking for a way to force recomputation of the task, if one of its dependencies is modified, or if the code of one of the tasks changes.

like image 285
Ophir Yoktan Avatar asked Mar 01 '15 13:03

Ophir Yoktan


People also ask

What is a Luigi task?

A Luigi Task describes a unit or work. The key methods of a Task, which must be implemented in a subclass are: * :py:meth:`run` - the computation done by this task. * :py:meth:`requires` - the list of Tasks that this Task depends on. * :py:meth:`output` - the output :py:class:`Target` that this Task creates.

What is Luigi Python?

Luigi is a Python package that manages long-running batch processing, which is the automated running of data processing jobs on batches of items. Luigi allows you to define a data processing job as a set of dependent tasks. For example, task B depends on the output of task A.

How do I install Luigi?

Run pip install luigi to install the latest stable version from PyPI. Documentation for the latest release is hosted on readthedocs. Run pip install luigi[toml] to install Luigi with TOML-based configs support. For the bleeding edge code, pip install git+https://github.com/spotify/luigi.git .


1 Answers

One way you could accomplish your goal is by overriding the complete(...) method.

The documentation for complete is straightforward.

Simply implement a function that checks your constraint, and returns False if you want to recompute the task.

For example, to force recomputation when a dependency has been updated, you could do:

def complete(self):
    """Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
    import os
    import time

    def mtime(path):
        return time.ctime(os.path.getmtime(path))

    # assuming 1 output
    if not os.path.exists(self.output().path):
        return False

    self_mtime = mtime(self.output().path) 

    # the below assumes a list of requirements, each with a list of outputs. YMMV
    for el in self.requires():
        if not el.complete():
            return False
        for output in el.output():
            if mtime(output.path) > self_mtime:
                return False

    return True

This will return False when any requirement is incomplete or any has been modified more recently than the current task or the output of the current task does not exist.

Detecting when code has changed is harder. You could use a similar scheme (checking mtime), but it'd be hit-or-miss unless every task has its own file.

Because of the ability to override complete, any logic you want for recomputation can be implemented. If you want a particular complete method for many tasks, I'd recommend sub-classing luigi.Task, implementing your custom complete there, and then inheriting your tasks from the sub-class.

like image 116
J David Smith Avatar answered Oct 25 '22 16:10

J David Smith